You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "lkmxsxd (via GitHub)" <gi...@apache.org> on 2023/05/24 10:55:15 UTC

[GitHub] [rocketmq-eventbridge] lkmxsxd opened a new pull request, #110: 整合基于TPS思路的反压逻辑

lkmxsxd opened a new pull request, #110:
URL: https://github.com/apache/rocketmq-eventbridge/pull/110

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-eventbridge] Jashinck commented on a diff in pull request #110: 整合基于TPS思路的反压逻辑

Posted by "Jashinck (via GitHub)" <gi...@apache.org>.
Jashinck commented on code in PR #110:
URL: https://github.com/apache/rocketmq-eventbridge/pull/110#discussion_r1205175435


##########
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java:
##########
@@ -51,16 +55,37 @@ public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eve
     @Override
     public void run() {
         while (!stopped) {
+            // 从任务列表中获取本次拉取的队列
+            String runnerName = circulatorContext.takeBusRunnerName();

Review Comment:
   这地方不合理,队列中的每个runnerName在不会更新的前提下,拿完一次就没了。很明显,不符合主线程要将每个runnerName,要一直拿事件的逻辑。Transfer, Trigger同样不可以这么处理。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-eventbridge] Jashinck commented on a diff in pull request #110: 整合基于TPS思路的反压逻辑

Posted by "Jashinck (via GitHub)" <gi...@apache.org>.
Jashinck commented on code in PR #110:
URL: https://github.com/apache/rocketmq-eventbridge/pull/110#discussion_r1205178486


##########
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java:
##########
@@ -69,8 +75,27 @@ public void init() {
     @Override
     public void run() {
         while (!stopped) {
-            Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
-            if(MapUtils.isEmpty(eventRecordMap)){
+            // 获取transform完成的runnerName进行推送
+            String runnerName = circulatorContext.takeRuleRunnerName();

Review Comment:
   同上



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-eventbridge] Jashinck commented on a diff in pull request #110: 整合基于TPS思路的反压逻辑

Posted by "Jashinck (via GitHub)" <gi...@apache.org>.
Jashinck commented on code in PR #110:
URL: https://github.com/apache/rocketmq-eventbridge/pull/110#discussion_r1205176792


##########
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java:
##########
@@ -51,16 +55,37 @@ public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eve
     @Override
     public void run() {
         while (!stopped) {
+            // 从任务列表中获取本次拉取的队列
+            String runnerName = circulatorContext.takeBusRunnerName();

Review Comment:
   Listener, Transfer, Trigger要为每个runnerName分配独立的线程去执行pull, transfer, trigger 逻辑



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-eventbridge] lkmxsxd closed pull request #110: 整合基于TPS思路的反压逻辑

Posted by "lkmxsxd (via GitHub)" <gi...@apache.org>.
lkmxsxd closed pull request #110: 整合基于TPS思路的反压逻辑
URL: https://github.com/apache/rocketmq-eventbridge/pull/110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-eventbridge] Jashinck commented on a diff in pull request #110: 整合基于TPS思路的反压逻辑

Posted by "Jashinck (via GitHub)" <gi...@apache.org>.
Jashinck commented on code in PR #110:
URL: https://github.com/apache/rocketmq-eventbridge/pull/110#discussion_r1205178953


##########
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java:
##########
@@ -45,42 +48,95 @@ public class EventTargetTrigger extends ServiceThread {
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
     private final ErrorHandler errorHandler;
-    private volatile Integer batchSize = 100;
+    private final AbsRateEstimator absRateEstimator;
 
     public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
-                              ErrorHandler errorHandler) {
+                              ErrorHandler errorHandler, AbsRateEstimator absRateEstimator) {
         this.circulatorContext = circulatorContext;
         this.offsetManager = offsetManager;
         this.errorHandler = errorHandler;
+        this.absRateEstimator = absRateEstimator;
     }
 
     @Override
     public void run() {
         while (!stopped) {
-            Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
-            if (MapUtils.isEmpty(targetRecordMap)) {
+            // 获取transform完成的runnerName进行推送
+            String runnerName = circulatorContext.takeTargetRunnerName();

Review Comment:
   同上



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org