You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 10:30:02 UTC

[inlong] branch master updated: [INLONG-7733][Manager] Support set the rate limit for the mark-delete operation of pulsar namespace (#7736)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bca3a3dbf [INLONG-7733][Manager] Support set the rate limit for the mark-delete operation of pulsar namespace (#7736)
bca3a3dbf is described below

commit bca3a3dbf4f35cbb52ae9b6b62e0017b5045305b
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Mar 30 18:29:55 2023 +0800

    [INLONG-7733][Manager] Support set the rate limit for the mark-delete operation of pulsar namespace (#7736)
---
 .../apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java    | 3 +++
 .../org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java   | 3 +++
 .../org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java  | 3 +++
 .../apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java   | 3 +++
 .../inlong/manager/service/resource/queue/pulsar/PulsarOperator.java   | 2 +-
 5 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
index 905647f96..192301a67 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
@@ -77,6 +77,9 @@ public class InlongGroupApproveRequest {
     @ApiModelProperty(value = "The unit of message size")
     private String retentionSizeUnit;
 
+    @ApiModelProperty(value = "The limit rate of the mark-delete operation")
+    private Double maxMarkDeleteRate;
+
     @ApiModelProperty(value = "Data report type, default is 0.\n"
             + " 0: report to DataProxy and respond when the DataProxy received data.\n"
             + " 1: report to DataProxy and respond after DataProxy sends data.\n"
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index 7f0deddd1..f8f5c911a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -80,6 +80,9 @@ public class InlongPulsarDTO extends BaseInlongGroup {
     @ApiModelProperty(value = "The unit of message size")
     private String retentionSizeUnit;
 
+    @ApiModelProperty(value = "The limit rate of the mark-delete operation")
+    private Double maxMarkDeleteRate;
+
     /**
      * Get the dto instance from the request
      */
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
index 451cccbfc..21d257598 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
@@ -76,6 +76,9 @@ public class InlongPulsarInfo extends InlongGroupInfo {
     @ApiModelProperty(value = "The unit of message size")
     private String retentionSizeUnit = "MB";
 
+    @ApiModelProperty(value = "The limit rate of the mark-delete operation")
+    private Double maxMarkDeleteRate = 0.0;
+
     public InlongPulsarInfo() {
         this.setMqType(MQType.PULSAR);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
index 7da1b625e..5a24f66e2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
@@ -77,6 +77,9 @@ public class InlongPulsarRequest extends InlongGroupRequest {
     @ApiModelProperty(value = "The unit of message size")
     private String retentionSizeUnit = "MB";
 
+    @ApiModelProperty(value = "The limit rate of the mark-delete operation")
+    private Double maxMarkDeleteRate = 0.0;
+
     public InlongPulsarRequest() {
         this.setMqType(MQType.PULSAR);
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 486d39af3..969896c99 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -133,7 +133,7 @@ public class PulsarOperator {
 
             // Configure persistence policies
             PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
-                    pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), 0);
+                    pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate());
             namespaces.setPersistence(namespaceName, persistencePolicies);
             LOGGER.info("success to create namespace={}", namespaceName);
         } catch (PulsarAdminException e) {