You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2023/10/23 02:31:14 UTC
[pulsar] branch master updated: [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 30d59e3ff25 [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)
30d59e3ff25 is described below
commit 30d59e3ff257ca00aafb773011a087b6f201cfcf
Author: houxiaoyu <ho...@apache.org>
AuthorDate: Sun Oct 22 21:31:06 2023 -0500
[refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)
### Motivation
See pip: https://github.com/apache/pulsar/pull/21129
### Modifications
Add `BrokerTimeAverageDataResources`
---
.../broker/resources/LoadBalanceResources.java | 23 ++++++++++++++++
.../loadbalance/impl/ModularLoadManagerImpl.java | 32 +++++++++-------------
.../impl/ModularLoadManagerImplTest.java | 4 +--
.../apache/pulsar/testclient/BrokerMonitor.java | 6 ++--
4 files changed, 41 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
index 839997a7035..e13efefee0b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -24,15 +24,19 @@ import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
@Getter
public class LoadBalanceResources {
public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
+ public static final String BROKER_TIME_AVERAGE_BASE_PATH = "/loadbalance/broker-time-average";
private final BundleDataResources bundleDataResources;
+ private final BrokerTimeAverageDataResources brokerTimeAverageDataResources;
public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
bundleDataResources = new BundleDataResources(store, operationTimeoutSec);
+ brokerTimeAverageDataResources = new BrokerTimeAverageDataResources(store, operationTimeoutSec);
}
public static class BundleDataResources extends BaseResources<BundleData> {
@@ -69,4 +73,23 @@ public class LoadBalanceResources {
return BUNDLE_DATA_BASE_PATH + "/" + bundle;
}
}
+
+ public static class BrokerTimeAverageDataResources extends BaseResources<TimeAverageBrokerData> {
+ public BrokerTimeAverageDataResources(MetadataStore store, int operationTimeoutSec) {
+ super(store, TimeAverageBrokerData.class, operationTimeoutSec);
+ }
+
+ public CompletableFuture<Void> updateTimeAverageBrokerData(String brokerLookupAddress,
+ TimeAverageBrokerData data) {
+ return setWithCreateAsync(getTimeAverageBrokerDataPath(brokerLookupAddress), __ -> data);
+ }
+
+ public CompletableFuture<Void> deleteTimeAverageBrokerData(String brokerLookupAddress) {
+ return deleteAsync(getTimeAverageBrokerDataPath(brokerLookupAddress));
+ }
+
+ private String getTimeAverageBrokerDataPath(final String brokerLookupAddress) {
+ return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 586478efa50..491941d497c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -108,9 +108,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Path to ZNode whose children contain ResourceQuota jsons.
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
- // Path to ZNode containing TimeAverageBrokerData jsons for each broker.
- public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
-
// Set of broker candidates to reuse so that object creation is avoided.
private final Set<String> brokerCandidateCache;
@@ -119,7 +116,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private ResourceLock<LocalBrokerData> brokerDataLock;
private MetadataCache<ResourceQuota> resourceQuotaCache;
- private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;
@@ -245,7 +241,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
this.pulsarResources = pulsar.getPulsarResources();
brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
- timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
@@ -991,13 +986,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
String lookupServiceAddress = pulsar.getLookupServiceAddress();
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
- final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
updateLocalBrokerData();
brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
-
- timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
- __ -> new TimeAverageBrokerData()).join();
+ pulsarResources.getLoadBalanceResources()
+ .getBrokerTimeAverageDataResources()
+ .updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData())
+ .join();
updateAll();
} catch (Exception e) {
log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e);
@@ -1154,9 +1149,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
final String broker = entry.getKey();
final TimeAverageBrokerData data = entry.getValue().getTimeAverageData();
- futures.add(timeAverageBrokerDataCache.readModifyUpdateOrCreate(
- TIME_AVERAGE_BROKER_ZPATH + "/" + broker, __ -> data)
- .thenApply(__ -> null));
+ futures.add(pulsarResources.getLoadBalanceResources()
+ .getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, data));
}
try {
@@ -1177,13 +1171,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
}
private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
- final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
- timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, ex) -> {
- if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
- log.warn("Failed to delete dead broker {} time "
- + "average data from metadata store", broker, ex);
- }
- });
+ pulsarResources.getLoadBalanceResources()
+ .getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__, ex) -> {
+ if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
+ log.warn("Failed to delete dead broker {} time "
+ + "average data from metadata store", broker, ex);
+ }
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 557393682fb..f8b5c125830 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;
import static java.lang.Thread.sleep;
-import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -778,7 +778,7 @@ public class ModularLoadManagerImplTest {
List<String> data = pulsar1.getLocalMetadataStore()
.getMetadataCache(TimeAverageBrokerData.class)
- .getChildren(TIME_AVERAGE_BROKER_ZPATH)
+ .getChildren(BROKER_TIME_AVERAGE_BASE_PATH)
.join();
Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index 3f896986016..a3e5a14a416 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
+import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -34,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
@@ -172,7 +172,7 @@ public class BrokerMonitor {
final LocalBrokerData localData = (LocalBrokerData) data;
numBundles = localData.getNumBundles();
messageRate = localData.getMsgRateIn() + localData.getMsgRateOut();
- final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+ final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
try {
final TimeAverageBrokerData timeAverageData = gson.fromJson(
new String(zkClient.getData(timeAveragePath, false, null)),
@@ -314,7 +314,7 @@ public class BrokerMonitor {
printLoadReport(broker, gson.fromJson(jsonString, LoadReport.class));
} else {
final LocalBrokerData localBrokerData = gson.fromJson(jsonString, LocalBrokerData.class);
- final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+ final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
try {
final TimeAverageBrokerData timeAverageData = gson.fromJson(
new String(zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);