You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/29 19:21:46 UTC

[GitHub] [pulsar] rdhabalia commented on a change in pull request #10391: PIP-45: Implement load managers locks using coordination service

rdhabalia commented on a change in pull request #10391:
URL: https://github.com/apache/pulsar/pull/10391#discussion_r623319212



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
##########
@@ -63,33 +53,17 @@ public void start() throws PulsarServerException {
                 + pulsar.getConfiguration().getWebServicePort().get();
         localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
                 new PulsarResourceDescription());
-        zkClient = pulsar.getZkClient();
 
-        localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+        LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+                pulsar.getWebServiceAddressTls(),
                 pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         localData.setProtocols(pulsar.getProtocolDataToAdvertise());
-        String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+        String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
 
         try {
-            // When running in standalone, this error can happen when killing the "standalone" process
-            // ungracefully since the ZK session will not be closed and it will take some time for ZK server
-            // to prune the expired sessions after startup.
-            // Since there's a single broker instance running, it's safe, in this mode, to remove the old lock
-
-            // Delete and recreate z-node
-            try {
-                if (zkClient.exists(brokerZnodePath, null) != null) {
-                    zkClient.delete(brokerZnodePath, -1);
-                }
-            } catch (NoNodeException nne) {
-                // Ignore if z-node was just expired
-            }
-
-            ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-
-        } catch (Exception e) {
-            throw new PulsarServerException(e);
+            lockManager.acquireLock(brokerReportPath, localData).join();

Review comment:
       can we add log here as it's a blocking call and we can use the log to troubleshoot if server is taking time to up come or not coming up..

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -115,15 +113,16 @@
     // Path to ZNode containing TimeAverageBrokerData jsons for each broker.
     public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
 
-    // ZooKeeper Cache of the currently available active brokers.
-    // availableActiveBrokers.get() will return a set of the broker names without an http prefix.
-    private ZooKeeperChildrenCache availableActiveBrokers;
-
     // Set of broker candidates to reuse so that object creation is avoided.
     private final Set<String> brokerCandidateCache;
 
-    // ZooKeeper cache of the local broker data, stored in LoadManager.LOADBALANCE_BROKER_ROOT.
-    private ZooKeeperDataCache<LocalBrokerData> brokerDataCache;
+    // Cache of the local broker data, stored in LoadManager.LOADBALANCE_BROKER_ROOT.
+    private LockManager<LocalBrokerData> brokersData;

Review comment:
       Can we use MetadataCache instead LockManager? `LocalBrokerData` is a metadata stored in localzk right now and it doesn't need to acquire lock. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -363,24 +343,27 @@ private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) {
     @Override
     public Set<String> getAvailableBrokers() {
         try {
-            return availableActiveBrokers.get();
+            return new TreeSet<>(brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).get());

Review comment:
       is there any requirement for ordering? can't we use HashSet?

##########
File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
##########
@@ -118,6 +118,19 @@
      */
     CompletableFuture<Void> create(String path, T value);
 
+    /**
+     * Create or update an object in the metadata store.
+     * <p>
+     * This operation will make sure to keep the cache consistent.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @param value
+     *            the object to insert in metadata store
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> updateOrCreate(String path, T value);

Review comment:
       we have used `readModifyUpdateOrCreate` at few places for this usecase.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -1014,12 +959,7 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
             if (needBrokerDataUpdate() || force) {
                 localData.setLastUpdate(System.currentTimeMillis());
 
-                try {
-                    zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
-                } catch (KeeperException.NoNodeException e) {
-                    ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
-                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-                }
+                brokerDataLock.updateValue(localData).join();

Review comment:
       same here, we should use time bounded get instead join . this can cause deadlock in system if future never completes.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -363,24 +343,27 @@ private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) {
     @Override
     public Set<String> getAvailableBrokers() {
         try {
-            return availableActiveBrokers.get();
+            return new TreeSet<>(brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).join());
         } catch (Exception e) {
             log.warn("Error when trying to get active brokers", e);
             return loadData.getBrokerData().keySet();
         }
     }
 
-    // Attempt to local the data for the given bundle in ZooKeeper.
+    // Attempt to local the data for the given bundle in metadata store
     // If it cannot be found, return the default bundle data.
     private BundleData getBundleDataOrDefault(final String bundle) {
         BundleData bundleData = null;
         try {
-            final String bundleZPath = getBundleDataZooKeeperPath(bundle);
-            final String quotaZPath = String.format("%s/%s", RESOURCE_QUOTA_ZPATH, bundle);
-            if (zkClient.exists(bundleZPath, null) != null) {
-                bundleData = readJson(zkClient.getData(bundleZPath, null, null), BundleData.class);
-            } else if (zkClient.exists(quotaZPath, null) != null) {
-                final ResourceQuota quota = readJson(zkClient.getData(quotaZPath, null, null), ResourceQuota.class);
+            Optional<BundleData> optBundleData = bundlesCache.get(getBundleDataPath(bundle)).join();

Review comment:
       can we use time bounded `get(timeout)` instead join to break the deadlock ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org