You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2023/10/23 02:31:14 UTC

[pulsar] branch master updated: [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d59e3ff25 [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)
30d59e3ff25 is described below

commit 30d59e3ff257ca00aafb773011a087b6f201cfcf
Author: houxiaoyu <ho...@apache.org>
AuthorDate: Sun Oct 22 21:31:06 2023 -0500

    [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (#21353)
    
    ### Motivation
    
    See pip: https://github.com/apache/pulsar/pull/21129
    
    ### Modifications
    
    Add  `BrokerTimeAverageDataResources`
---
 .../broker/resources/LoadBalanceResources.java     | 23 ++++++++++++++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 32 +++++++++-------------
 .../impl/ModularLoadManagerImplTest.java           |  4 +--
 .../apache/pulsar/testclient/BrokerMonitor.java    |  6 ++--
 4 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
index 839997a7035..e13efefee0b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -24,15 +24,19 @@ import lombok.Getter;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
 
 @Getter
 public class LoadBalanceResources {
     public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
+    public static final String BROKER_TIME_AVERAGE_BASE_PATH = "/loadbalance/broker-time-average";
 
     private final BundleDataResources bundleDataResources;
+    private final BrokerTimeAverageDataResources brokerTimeAverageDataResources;
 
     public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
         bundleDataResources = new BundleDataResources(store, operationTimeoutSec);
+        brokerTimeAverageDataResources = new BrokerTimeAverageDataResources(store, operationTimeoutSec);
     }
 
     public static class BundleDataResources extends BaseResources<BundleData> {
@@ -69,4 +73,23 @@ public class LoadBalanceResources {
             return BUNDLE_DATA_BASE_PATH + "/" + bundle;
         }
     }
+
+    public static class BrokerTimeAverageDataResources extends BaseResources<TimeAverageBrokerData> {
+        public BrokerTimeAverageDataResources(MetadataStore store, int operationTimeoutSec) {
+            super(store, TimeAverageBrokerData.class, operationTimeoutSec);
+        }
+
+        public CompletableFuture<Void> updateTimeAverageBrokerData(String brokerLookupAddress,
+                TimeAverageBrokerData data) {
+            return setWithCreateAsync(getTimeAverageBrokerDataPath(brokerLookupAddress), __ -> data);
+        }
+
+        public CompletableFuture<Void> deleteTimeAverageBrokerData(String brokerLookupAddress) {
+            return deleteAsync(getTimeAverageBrokerDataPath(brokerLookupAddress));
+        }
+
+        private String getTimeAverageBrokerDataPath(final String brokerLookupAddress) {
+            return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 586478efa50..491941d497c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -108,9 +108,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
     // Path to ZNode whose children contain ResourceQuota jsons.
     public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
 
-    // Path to ZNode containing TimeAverageBrokerData jsons for each broker.
-    public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
-
     // Set of broker candidates to reuse so that object creation is avoided.
     private final Set<String> brokerCandidateCache;
 
@@ -119,7 +116,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
     private ResourceLock<LocalBrokerData> brokerDataLock;
 
     private MetadataCache<ResourceQuota> resourceQuotaCache;
-    private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
 
     // Broker host usage object used to calculate system resource usage.
     private BrokerHostUsage brokerHostUsage;
@@ -245,7 +241,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         this.pulsarResources = pulsar.getPulsarResources();
         brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
         resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
-        timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
         pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
         pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
 
@@ -991,13 +986,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
 
             String lookupServiceAddress = pulsar.getLookupServiceAddress();
             brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
-            final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
             updateLocalBrokerData();
 
             brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
-
-            timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
-                    __ -> new TimeAverageBrokerData()).join();
+            pulsarResources.getLoadBalanceResources()
+                    .getBrokerTimeAverageDataResources()
+                    .updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData())
+                    .join();
             updateAll();
         } catch (Exception e) {
             log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e);
@@ -1154,9 +1149,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
             final String broker = entry.getKey();
             final TimeAverageBrokerData data = entry.getValue().getTimeAverageData();
-            futures.add(timeAverageBrokerDataCache.readModifyUpdateOrCreate(
-                    TIME_AVERAGE_BROKER_ZPATH + "/" + broker, __ -> data)
-                    .thenApply(__ -> null));
+            futures.add(pulsarResources.getLoadBalanceResources()
+                    .getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, data));
         }
 
         try {
@@ -1177,13 +1171,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
     }
 
     private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
-        final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
-        timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, ex) -> {
-            if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
-                log.warn("Failed to delete dead broker {} time "
-                        + "average data from metadata store", broker, ex);
-            }
-        });
+        pulsarResources.getLoadBalanceResources()
+                .getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__, ex) -> {
+                    if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
+                        log.warn("Failed to delete dead broker {} time "
+                                + "average data from metadata store", broker, ex);
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 557393682fb..f8b5c125830 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static java.lang.Thread.sleep;
-import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
 import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -778,7 +778,7 @@ public class ModularLoadManagerImplTest {
 
         List<String> data =  pulsar1.getLocalMetadataStore()
                 .getMetadataCache(TimeAverageBrokerData.class)
-                .getChildren(TIME_AVERAGE_BROKER_ZPATH)
+                .getChildren(BROKER_TIME_AVERAGE_BASE_PATH)
                 .join();
 
         Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index 3f896986016..a3e5a14a416 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.testclient;
 
 import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
+import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -34,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SizeUnit;
@@ -172,7 +172,7 @@ public class BrokerMonitor {
                     final LocalBrokerData localData = (LocalBrokerData) data;
                     numBundles = localData.getNumBundles();
                     messageRate = localData.getMsgRateIn() + localData.getMsgRateOut();
-                    final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+                    final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
                     try {
                         final TimeAverageBrokerData timeAverageData = gson.fromJson(
                                 new String(zkClient.getData(timeAveragePath, false, null)),
@@ -314,7 +314,7 @@ public class BrokerMonitor {
                 printLoadReport(broker, gson.fromJson(jsonString, LoadReport.class));
             } else {
                 final LocalBrokerData localBrokerData = gson.fromJson(jsonString, LocalBrokerData.class);
-                final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+                final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
                 try {
                     final TimeAverageBrokerData timeAverageData = gson.fromJson(
                             new String(zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);