You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/20 17:17:08 UTC

[GitHub] [pulsar] bharanic-dev opened a new pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

bharanic-dev opened a new pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657


   ### Motivation
   
   Next set of changes for PIP-82. Add metadata-store config listener for resource groups.
   
   ### Modifications
   
   Add listener for resource-group configuration changes.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   - Added unit tests
   -  Tested by launching multiple standalone instances of pulsar-broker and adding/deleting resource-groups using pulsar-admin.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637138909



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");
         }
 
-        if (rgConfig.getName().isEmpty()) {
+        if (rgName.isEmpty()) {
             throw new IllegalArgumentException("ResourceGroupCreate: can't create resource group with an empty name");
         }
 
-        ResourceGroup rg = getResourceGroupInternal(rgConfig.getName());
+        ResourceGroup rg = getResourceGroupInternal(rgName);
         if (rg != null) {
-            throw new PulsarAdminException("Resource group already exists:" + rgConfig.getName());
+            throw new PulsarAdminException("Resource group already exists:" + rgName);
         }
     }
 
     private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);
     private final PulsarService pulsar;
     protected final ResourceQuotaCalculator quotaCalculator;
-    private ResourceUsageTransportManager resourceUsageTransportMgr;
+    private ResourceUsageTransportManager resourceUsageTransportManagerMgr;

Review comment:
       This name was generated by the IDE (probably to differentiate the objects of the classes for which there is a name clash). I assume there must be some convention that IDE is picking and let the name be what was generated instead of changing it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637120866



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
+  private final ResourceGroupService rgService;
+  private final PulsarService pulsarService;
+  private final ResourceGroupResources rgResources;
+
+  public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {

Review comment:
       Nit: this seems to be using 2 spaces for indentation. We should stick with 4-spaces instead

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.SNAPPY;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<byte[]> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<byte[]> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer()
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)
+                    .compressionType(SNAPPY)

Review comment:
       In general we use LZ4 as default compression scheme.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.SNAPPY;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<byte[]> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<byte[]> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer()
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)
+                    .compressionType(SNAPPY)
+                    .create();
+        }
+
+        public ResourceUsageWriterTask() throws PulsarClientException {
+            producer = createProducer();
+            resourceUsagePublishTask = pulsarService.getExecutor().scheduleAtFixedRate(
+                    this,
+                    pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+                    pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+                    TimeUnit.SECONDS);
+        }
+
+        @Override
+        public void run() {
+            if (!publisherMap.isEmpty()) {
+                ResourceUsageInfo rUsageInfo = new ResourceUsageInfo();
+                rUsageInfo.setBroker(pulsarService.getBrokerServiceUrl());
+
+                publisherMap.forEach((key, item) -> item.fillResourceUsage(rUsageInfo.addUsageMap()));
+
+                ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer(rUsageInfo.getSerializedSize());
+                rUsageInfo.writeTo(buf);
+
+                byte[] bytes = buf.array();

Review comment:
       This is not safe because the array could be bigger. It would be better to use `ByteBuffer` 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.SNAPPY;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<byte[]> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<byte[]> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer()

Review comment:
       We can create a producer with `SCHEMA.BYTEBUFFER` and then allocate a direct memory Netty ByteBuf and use `buffer.nioBuffer()` when sending to producer.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
+  private final ResourceGroupService rgService;
+  private final PulsarService pulsarService;
+  private final ResourceGroupResources rgResources;
+
+  public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
+    this.rgService = rgService;
+    this.pulsarService = pulsarService;
+    this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
+    loadAllResourceGroups();
+    this.rgResources.getStore().registerListener(this);
+  }
+
+  private void loadAllResourceGroups() {
+    rgResources.getChildrenAsync(path(RESOURCEGROUPS)).whenComplete((rgList, ex) -> {
+      if (ex != null) {
+        LOG.error("Exception when fetching resource groups", ex);
+        return;
+      }
+      final Set<String> existingSet = rgService.resourceGroupGetAll();
+      HashSet<String> newSet = new HashSet<>();
+
+      for (String rgName : rgList) {
+        newSet.add(rgName);
+      }
+
+      final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);
+
+      for (String rgName: deleteList) {
+        try {
+          rgService.resourceGroupDelete(rgName);
+        } catch (PulsarAdminException e) {
+          LOG.error("Got exception while deleting resource group {}, {}", rgName, e);
+        }
+      }
+
+      final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
+      for (String rgName: addList) {
+        final String resourceGroupPath = path(RESOURCEGROUPS, rgName);
+        pulsarService.getPulsarResources().getResourcegroupResources()
+          .getAsync(resourceGroupPath).thenAccept((optionalRg) -> {
+            ResourceGroup rg = optionalRg.get();
+            if (rgService.resourceGroupGet(rgName) == null) {
+              LOG.info("Creating resource group {}, {}", rgName, rg.toString());
+              try {
+                rgService.resourceGroupCreate(rgName, rg);
+              } catch (PulsarAdminException ex1) {
+                LOG.error("Got an exception while creating RG {}", rgName, ex1);
+              }
+            }
+          }).exceptionally((ex1) -> {
+            LOG.error("Failed to fetch resourceGroup", ex1);
+            return null;
+        });
+      }
+    });
+  }
+
+  private void updateResourceGroup(String notifyPath) {
+    String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
+
+    rgResources.getAsync(notifyPath).whenComplete((optionalRg, ex) -> {
+      if (ex != null) {
+        LOG.error("Exception when getting resource group {}", rgName, ex);
+        return;
+      }
+      ResourceGroup rg = optionalRg.get();
+      LOG.info("RG: {}", rg.toString());

Review comment:
       This log we can probably remove

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
+  private final ResourceGroupService rgService;
+  private final PulsarService pulsarService;
+  private final ResourceGroupResources rgResources;
+
+  public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
+    this.rgService = rgService;
+    this.pulsarService = pulsarService;
+    this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
+    loadAllResourceGroups();
+    this.rgResources.getStore().registerListener(this);
+  }
+
+  private void loadAllResourceGroups() {
+    rgResources.getChildrenAsync(path(RESOURCEGROUPS)).whenComplete((rgList, ex) -> {
+      if (ex != null) {
+        LOG.error("Exception when fetching resource groups", ex);
+        return;
+      }
+      final Set<String> existingSet = rgService.resourceGroupGetAll();
+      HashSet<String> newSet = new HashSet<>();
+
+      for (String rgName : rgList) {
+        newSet.add(rgName);
+      }
+
+      final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);
+
+      for (String rgName: deleteList) {
+        try {
+          rgService.resourceGroupDelete(rgName);
+        } catch (PulsarAdminException e) {
+          LOG.error("Got exception while deleting resource group {}, {}", rgName, e);
+        }
+      }
+
+      final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
+      for (String rgName: addList) {
+        final String resourceGroupPath = path(RESOURCEGROUPS, rgName);
+        pulsarService.getPulsarResources().getResourcegroupResources()
+          .getAsync(resourceGroupPath).thenAccept((optionalRg) -> {
+            ResourceGroup rg = optionalRg.get();
+            if (rgService.resourceGroupGet(rgName) == null) {
+              LOG.info("Creating resource group {}, {}", rgName, rg.toString());
+              try {
+                rgService.resourceGroupCreate(rgName, rg);
+              } catch (PulsarAdminException ex1) {
+                LOG.error("Got an exception while creating RG {}", rgName, ex1);
+              }
+            }
+          }).exceptionally((ex1) -> {
+            LOG.error("Failed to fetch resourceGroup", ex1);
+            return null;
+        });
+      }
+    });
+  }
+
+  private void updateResourceGroup(String notifyPath) {
+    String rgName = notifyPath.substring(notifyPath.lastIndexOf('/') + 1);
+
+    rgResources.getAsync(notifyPath).whenComplete((optionalRg, ex) -> {
+      if (ex != null) {
+        LOG.error("Exception when getting resource group {}", rgName, ex);
+        return;
+      }
+      ResourceGroup rg = optionalRg.get();
+      LOG.info("RG: {}", rg.toString());
+      try {
+          LOG.info("Updating resource group {}, {}", rgName, rg.toString());
+          rgService.resourceGroupUpdate(rgName, rg);
+      } catch (PulsarAdminException ex1) {
+        LOG.error("Got an exception while creating resource group {}", rgName, ex1);
+      }
+    });
+  }
+
+  @Override
+  public void accept(Notification notification) {
+    String notifyPath = notification.getPath();
+
+    LOG.info("Metadata store notification: Path {}, Type {}", notifyPath, notification.getType());

Review comment:
       Do not log if it's not related to ResourceGroup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655891576



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupNamespaceConfigListener.class);
+    private final ResourceGroupService rgService;
+    private final PulsarService pulsarService;
+    private final NamespaceResources namespaceResources;
+    private final TenantResources tenantResources;
+    private final ResourceGroupConfigListener rgConfigListener;
+
+    public ResourceGroupNamespaceConfigListener(ResourceGroupService rgService, PulsarService pulsarService,
+            ResourceGroupConfigListener rgConfigListener) {
+        this.rgService = rgService;
+        this.pulsarService = pulsarService;
+        this.namespaceResources = pulsarService.getPulsarResources().getNamespaceResources();
+        this.tenantResources = pulsarService.getPulsarResources().getTenantResources();
+        this.rgConfigListener = rgConfigListener;
+        loadAllNamespaceResourceGroups();
+        this.namespaceResources.getStore().registerListener(this);
+    }
+
+    private void updateNamespaceResourceGroup(String path) {
+        String nsName = path.substring(path(POLICIES).length() + 1);
+
+        namespaceResources.getAsync(path).whenCompleteAsync((optionalPolicies, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when getting namespace {}", nsName, ex);
+                return;
+            }
+            Policies policy = optionalPolicies.get();
+            reconcileNamespaceResourceGroup(nsName, policy);
+        });
+    }
+
+    private void loadAllNamespaceResourceGroups() {
+        tenantResources.getChildrenAsync(path(POLICIES)).whenComplete((tenantList, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when fetching tenants", ex);
+                return;
+            }
+            for (String ts: tenantList) {
+                namespaceResources.getChildrenAsync(path(POLICIES, ts)).whenComplete((nsList, ex1) -> {
+                    if (ex1 != null) {
+                        LOG.error("Exception when fetching namespaces", ex1);
+                    } else {
+                        for (String ns: nsList) {
+                            updateNamespaceResourceGroup(path(POLICIES, ts, ns));

Review comment:
       What if some resource groups got updated and others did not get updated successfully.  Could there be unintended behavior in that scenario?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637140554



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");

Review comment:
       same, can I take care of it in my next commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hsaputra commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
hsaputra commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655770029



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {

Review comment:
       HI @bharanic-dev  - could you add JavaDoc summery for this class to explain "why" it is created and description of its primary function and maybe relationship with other parts of the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655886394



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.LZ4;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<ByteBuffer> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<ByteBuffer> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer(Schema.BYTEBUFFER)
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)

Review comment:
       why is blockIfQueueFull set to false?  This could cause buffered messages and memory to grow unbounded.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-845308123


   @merlimat @jerrypeng @ravi-vaidyanathan @kaushik-develop PTAL when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hsaputra commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
hsaputra commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655770860



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {

Review comment:
       Similar for this class. Would be better to have Java doc header explanation on why this class is created and description of its main functions and relations with other parts of the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-845543691


   /pulsarbot run-failed-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637121030



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -117,19 +122,24 @@ public ResourceGroup resourceGroupGet(String resourceGroupName) {
      *
      * @throws if RG with that name does not exist.
      */
-    public void resourceGroupUpdate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    public void resourceGroupUpdate(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroupConfigInfo");

Review comment:
       Nit: `ResourceGroupConfigInfo` should be `ResourceGroup` now in the exception string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-861924860


   @merlimat, I incorporated the review feedback. please help review.
   @jerrypeng please help review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r638044426



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -70,9 +70,10 @@
 
     // Default ctor: it is not expected that anything outside of this package will need to directly
     // construct a ResourceGroup (i.e., without going through ResourceGroupService).
-    protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig) {
+    protected ResourceGroup(ResourceGroupService rgs, String name,
+                            org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
         this.rgs = rgs;

Review comment:
       OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637123495



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
##########
@@ -113,8 +113,8 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
     }
 
     private void prepareData() throws PulsarAdminException {
-        this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
+        this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager");
         this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
         admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
     }
-}
\ No newline at end of file
+}

Review comment:
       Overall: org.apache.pulsar.broker.resourcegroup.ResourceGroupConfigInfo class (the entire file) can now be deleted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng merged pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
jerrypeng merged pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-866784001


   This PR introduced a new flaky test issue #11045 . @bharanic-dev Would you be able to fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656625868



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupNamespaceConfigListener.class);
+    private final ResourceGroupService rgService;
+    private final PulsarService pulsarService;
+    private final NamespaceResources namespaceResources;
+    private final TenantResources tenantResources;
+    private final ResourceGroupConfigListener rgConfigListener;
+
+    public ResourceGroupNamespaceConfigListener(ResourceGroupService rgService, PulsarService pulsarService,
+            ResourceGroupConfigListener rgConfigListener) {
+        this.rgService = rgService;
+        this.pulsarService = pulsarService;
+        this.namespaceResources = pulsarService.getPulsarResources().getNamespaceResources();
+        this.tenantResources = pulsarService.getPulsarResources().getTenantResources();
+        this.rgConfigListener = rgConfigListener;
+        loadAllNamespaceResourceGroups();
+        this.namespaceResources.getStore().registerListener(this);
+    }
+
+    private void updateNamespaceResourceGroup(String path) {
+        String nsName = path.substring(path(POLICIES).length() + 1);
+
+        namespaceResources.getAsync(path).whenCompleteAsync((optionalPolicies, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when getting namespace {}", nsName, ex);
+                return;
+            }
+            Policies policy = optionalPolicies.get();
+            reconcileNamespaceResourceGroup(nsName, policy);
+        });
+    }
+
+    private void loadAllNamespaceResourceGroups() {
+        tenantResources.getChildrenAsync(path(POLICIES)).whenComplete((tenantList, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when fetching tenants", ex);
+                return;
+            }
+            for (String ts: tenantList) {
+                namespaceResources.getChildrenAsync(path(POLICIES, ts)).whenComplete((nsList, ex1) -> {
+                    if (ex1 != null) {
+                        LOG.error("Exception when fetching namespaces", ex1);
+                    } else {
+                        for (String ns: nsList) {
+                            updateNamespaceResourceGroup(path(POLICIES, ts, ns));

Review comment:
       @bharanic-dev updateNamespaceResourceGroup does not update anything in zk? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656404829



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {

Review comment:
       done.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-866156974


   > Generally looks good. Lefts some comments. Please also address the other comments left by reviewers. Thank you!
   
   Done. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656377543



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.LZ4;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<ByteBuffer> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<ByteBuffer> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer(Schema.BYTEBUFFER)
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)

Review comment:
       @jerrypeng from the documentation:
   
   "     * <p>Default is {@code false}. If set to {@code false}, send operations will immediately fail with
        * {@link ProducerQueueIsFullError} when there is no space left in pending queue. If set to
        * {@code true}, the {@link Producer#sendAsync} operation will instead block."
        
    it appears that the send operation will fail if the queue is full and that the default queue size is 1000 (from below). From this, the memory should not grow unbounded? Please let me know if I misunderstood something. I think we can set the maxPendingMessages to a lower value, but I am inclined to make that change later, once we do some testing.
    
    "   /**
        * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
        *
        * <p>When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync}
        * will fail unless {@code blockIfQueueFull=true}. Use {@link #blockIfQueueFull(boolean)}
        * to change the blocking behavior.
        *
        * <p>The producer queue size also determines the max amount of memory that will be required by
        * the client application. Until, the producer gets a successful acknowledgment back from the broker,
        * it will keep in memory (direct memory pool) all the messages in the pending queue.
        *
        * <p>Default is 1000.
        *
        * @param maxPendingMessages
        *            the max size of the pending messages queue for the producer
        * @return the producer builder instance
        */"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656426239



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -117,19 +122,24 @@ public ResourceGroup resourceGroupGet(String resourceGroupName) {
      *
      * @throws if RG with that name does not exist.
      */
-    public void resourceGroupUpdate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    public void resourceGroupUpdate(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroupConfigInfo");

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655886394



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
##########
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.LZ4;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTopicTransportManager implements ResourceUsageTransportManager {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<ByteBuffer> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<ByteBuffer> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer(Schema.BYTEBUFFER)
+                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)

Review comment:
       why is blockIfQueueFull set to false?  This could cause buffered messages and memory to grow unbounded.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupNamespaceConfigListener.class);
+    private final ResourceGroupService rgService;
+    private final PulsarService pulsarService;
+    private final NamespaceResources namespaceResources;
+    private final TenantResources tenantResources;
+    private final ResourceGroupConfigListener rgConfigListener;
+
+    public ResourceGroupNamespaceConfigListener(ResourceGroupService rgService, PulsarService pulsarService,
+            ResourceGroupConfigListener rgConfigListener) {
+        this.rgService = rgService;
+        this.pulsarService = pulsarService;
+        this.namespaceResources = pulsarService.getPulsarResources().getNamespaceResources();
+        this.tenantResources = pulsarService.getPulsarResources().getTenantResources();
+        this.rgConfigListener = rgConfigListener;
+        loadAllNamespaceResourceGroups();
+        this.namespaceResources.getStore().registerListener(this);
+    }
+
+    private void updateNamespaceResourceGroup(String path) {
+        String nsName = path.substring(path(POLICIES).length() + 1);
+
+        namespaceResources.getAsync(path).whenCompleteAsync((optionalPolicies, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when getting namespace {}", nsName, ex);
+                return;
+            }
+            Policies policy = optionalPolicies.get();
+            reconcileNamespaceResourceGroup(nsName, policy);
+        });
+    }
+
+    private void loadAllNamespaceResourceGroups() {
+        tenantResources.getChildrenAsync(path(POLICIES)).whenComplete((tenantList, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when fetching tenants", ex);
+                return;
+            }
+            for (String ts: tenantList) {
+                namespaceResources.getChildrenAsync(path(POLICIES, ts)).whenComplete((nsList, ex1) -> {
+                    if (ex1 != null) {
+                        LOG.error("Exception when fetching namespaces", ex1);
+                    } else {
+                        for (String ns: nsList) {
+                            updateNamespaceResourceGroup(path(POLICIES, ts, ns));

Review comment:
       What if some resource groups got updated and others did not get updated successfully.  Could there be unintended behavior in that scenario?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hsaputra commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
hsaputra commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655770029



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import com.google.common.collect.Sets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupConfigListener implements Consumer<Notification> {

Review comment:
       HI @bharanic-dev  - could you add JavaDoc summery for this class to explain "why" it is created and description of its primary function and maybe relationship with other parts of the code.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {

Review comment:
       Similar for this class. Would be nice to have Java doc header explanation on why this class is created and description of its main functions and relations with other parts of the code.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -128,7 +130,7 @@ public ResourceGroup(ResourceGroup other) {
         }
     }
 
-    protected void updateResourceGroup(ResourceGroupConfigInfo rgConfig) {
+    protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {

Review comment:
       Why change to include the full package name here?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {

Review comment:
       Similar for this class. Would be better to have Java doc header explanation on why this class is created and description of its main functions and relations with other parts of the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656384572



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupNamespaceConfigListener.class);
+    private final ResourceGroupService rgService;
+    private final PulsarService pulsarService;
+    private final NamespaceResources namespaceResources;
+    private final TenantResources tenantResources;
+    private final ResourceGroupConfigListener rgConfigListener;
+
+    public ResourceGroupNamespaceConfigListener(ResourceGroupService rgService, PulsarService pulsarService,
+            ResourceGroupConfigListener rgConfigListener) {
+        this.rgService = rgService;
+        this.pulsarService = pulsarService;
+        this.namespaceResources = pulsarService.getPulsarResources().getNamespaceResources();
+        this.tenantResources = pulsarService.getPulsarResources().getTenantResources();
+        this.rgConfigListener = rgConfigListener;
+        loadAllNamespaceResourceGroups();
+        this.namespaceResources.getStore().registerListener(this);
+    }
+
+    private void updateNamespaceResourceGroup(String path) {
+        String nsName = path.substring(path(POLICIES).length() + 1);
+
+        namespaceResources.getAsync(path).whenCompleteAsync((optionalPolicies, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when getting namespace {}", nsName, ex);
+                return;
+            }
+            Policies policy = optionalPolicies.get();
+            reconcileNamespaceResourceGroup(nsName, policy);
+        });
+    }
+
+    private void loadAllNamespaceResourceGroups() {
+        tenantResources.getChildrenAsync(path(POLICIES)).whenComplete((tenantList, ex) -> {
+            if (ex != null) {
+                LOG.error("Exception when fetching tenants", ex);
+                return;
+            }
+            for (String ts: tenantList) {
+                namespaceResources.getChildrenAsync(path(POLICIES, ts)).whenComplete((nsList, ex1) -> {
+                    if (ex1 != null) {
+                        LOG.error("Exception when fetching namespaces", ex1);
+                    } else {
+                        for (String ns: nsList) {
+                            updateNamespaceResourceGroup(path(POLICIES, ts, ns));

Review comment:
       is the question that in the following scenario:
   - namespace NS1 and namespace NS2 are updated to attach to some resource groups.
   - broker was able to successfully process NS1, but the NS2 failed (for some reason).
   
   is that correct?
   I don't know why NS2 update will fail as all it is doing is updating some internal data structures (unless broker crashes due to some issue). Or there is some bug. If former, upon restart broker will process the config again. If there is a bug, we need to fix it. And to detect any failure in operations, I have added logs indicating the failure and the reason for the failure.
   
   I am also not clear if the scenario is different for other configuration changes handled by broker.
   
   can you please let me know if I misunderstood the question. thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637140032



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -117,19 +122,24 @@ public ResourceGroup resourceGroupGet(String resourceGroupName) {
      *
      * @throws if RG with that name does not exist.
      */
-    public void resourceGroupUpdate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    public void resourceGroupUpdate(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroupConfigInfo");

Review comment:
       if it is ok with you, I can take care of it it in my next commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-866937127


   @lhotari apologies, I did not see test failures in my local runs. Thank you for making the tests more robust.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637115027



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -179,7 +181,7 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
             if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
                 log.info("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
                 transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
-                // ToDo: call the unregister for publisher after typo is fixed in transport manager
+                transportManager.unregisterResourceUsagePublisher(this.ruPublisher);

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-866394324


   @jerrypeng thank you for the review and for merging the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637118409



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");
         }
 
-        if (rgConfig.getName().isEmpty()) {
+        if (rgName.isEmpty()) {
             throw new IllegalArgumentException("ResourceGroupCreate: can't create resource group with an empty name");
         }
 
-        ResourceGroup rg = getResourceGroupInternal(rgConfig.getName());
+        ResourceGroup rg = getResourceGroupInternal(rgName);
         if (rg != null) {
-            throw new PulsarAdminException("Resource group already exists:" + rgConfig.getName());
+            throw new PulsarAdminException("Resource group already exists:" + rgName);
         }
     }
 
     private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);
     private final PulsarService pulsar;
     protected final ResourceQuotaCalculator quotaCalculator;
-    private ResourceUsageTransportManager resourceUsageTransportMgr;
+    private ResourceUsageTransportManager resourceUsageTransportManagerMgr;

Review comment:
       Nit: didn't follow why there is a 2-level manage in the name (`ManageMgr`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hsaputra commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
hsaputra commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655791860



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -128,7 +130,7 @@ public ResourceGroup(ResourceGroup other) {
         }
     }
 
-    protected void updateResourceGroup(ResourceGroupConfigInfo rgConfig) {
+    protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {

Review comment:
       Why change to include the full package name here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656393090



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -128,7 +130,7 @@ public ResourceGroup(ResourceGroup other) {
         }
     }
 
-    protected void updateResourceGroup(ResourceGroupConfigInfo rgConfig) {
+    protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {

Review comment:
       the configuration datastructure (visible to user) and the internal datastructure are both called ResourceGroup. So, fully qualifying name was needed to dis-ambiguate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637122625



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");

Review comment:
       Nit: `ResourceGroupConfigInfo ` should now be be `ResourceGroup` in the exception string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r656426458



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637135851



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -70,9 +70,10 @@
 
     // Default ctor: it is not expected that anything outside of this package will need to directly
     // construct a ResourceGroup (i.e., without going through ResourceGroupService).
-    protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig) {
+    protected ResourceGroup(ResourceGroupService rgs, String name,
+                            org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
         this.rgs = rgs;

Review comment:
       My understanding is this is the artifact of how the data is laid out in znode in zookeeper. The name of the resource-group is part of the path to znode. The data is the fields in the ResourceGroup datastructure. I think embedding the name in the data will likely be confusing and wasteful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-845544825


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] ravi-vaidyanathan commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
ravi-vaidyanathan commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-845383740


   looks good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] ravi-vaidyanathan commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
ravi-vaidyanathan commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637147345



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -70,9 +70,10 @@
 
     // Default ctor: it is not expected that anything outside of this package will need to directly
     // construct a ResourceGroup (i.e., without going through ResourceGroupService).
-    protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig) {
+    protected ResourceGroup(ResourceGroupService rgs, String name,
+                            org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
         this.rgs = rgs;

Review comment:
       @bharanic-dev this would also mean adding name to the ResourceGroup class right? that would not be inline with the API exposed to the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r638042934



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -539,29 +549,32 @@ private void initialize() {
                     this.timeUnitScale);
         this.maxIntervalForSuppressingReportsMSecs =
                 this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
+
     }
 
-    private void checkRGCreateParams(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupCreate: Invalid null ResourceGroupConfigInfo");
         }
 
-        if (rgConfig.getName().isEmpty()) {
+        if (rgName.isEmpty()) {
             throw new IllegalArgumentException("ResourceGroupCreate: can't create resource group with an empty name");
         }
 
-        ResourceGroup rg = getResourceGroupInternal(rgConfig.getName());
+        ResourceGroup rg = getResourceGroupInternal(rgName);
         if (rg != null) {
-            throw new PulsarAdminException("Resource group already exists:" + rgConfig.getName());
+            throw new PulsarAdminException("Resource group already exists:" + rgName);
         }
     }
 
     private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);
     private final PulsarService pulsar;
     protected final ResourceQuotaCalculator quotaCalculator;
-    private ResourceUsageTransportManager resourceUsageTransportMgr;
+    private ResourceUsageTransportManager resourceUsageTransportManagerMgr;

Review comment:
       My vote would be to give it a meaningful name; "ManagerMgr" sounds odd, even if it is picked by an IDE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637121030



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
##########
@@ -117,19 +122,24 @@ public ResourceGroup resourceGroupGet(String resourceGroupName) {
      *
      * @throws if RG with that name does not exist.
      */
-    public void resourceGroupUpdate(ResourceGroupConfigInfo rgConfig) throws PulsarAdminException {
+    public void resourceGroupUpdate(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
+      throws PulsarAdminException {
         try {
             checkNotNull(rgConfig);
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroupConfigInfo");

Review comment:
       Nit: `ResourceGroupConfigInfo` should be `ResourceGroup` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] kaushik-develop commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
kaushik-develop commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637114635



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
##########
@@ -70,9 +70,10 @@
 
     // Default ctor: it is not expected that anything outside of this package will need to directly
     // construct a ResourceGroup (i.e., without going through ResourceGroupService).
-    protected ResourceGroup(ResourceGroupService rgs, ResourceGroupConfigInfo rgConfig) {
+    protected ResourceGroup(ResourceGroupService rgs, String name,
+                            org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
         this.rgs = rgs;

Review comment:
       Nit/qs: would it make sense to have the resource-group name within the org.apache.pulsar.common.policies.data.ResourceGroup class? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#issuecomment-866865441


   I pushed a PR #11048 to fix the flaky ResourceGroupConfigListenerTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hsaputra commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
hsaputra commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r655770860



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.broker.admin.ZkAdminPaths.POLICIES;
+import static org.apache.pulsar.common.policies.path.PolicyPath.path;
+import java.util.function.Consumer;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceGroupNamespaceConfigListener implements Consumer<Notification> {

Review comment:
       Similar for this class. Would be nice to have Java doc header explanation on why this class is created and description of its main functions and relations with other parts of the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] bharanic-dev commented on a change in pull request #10657: [PIP-82] [pulsar-broker] Add resource-group configuration listener.

Posted by GitBox <gi...@apache.org>.
bharanic-dev commented on a change in pull request #10657:
URL: https://github.com/apache/pulsar/pull/10657#discussion_r637140932



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
##########
@@ -113,8 +113,8 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
     }
 
     private void prepareData() throws PulsarAdminException {
-        this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
+        this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager");
         this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
         admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
     }
-}
\ No newline at end of file
+}

Review comment:
       yes, I plan to do it in my next commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org