You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/14 12:27:36 UTC

[incubator-inlong] branch master updated: [INLONG-1552]Java SDK should deal with the default flow control rule (#1556)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ceaee3e  [INLONG-1552]Java SDK should deal with the default flow control rule (#1556)
ceaee3e is described below

commit ceaee3e3cf9310fe46c5535b134c66871139bf0b
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Sep 14 20:27:28 2021 +0800

    [INLONG-1552]Java SDK should deal with the default flow control rule (#1556)
---
 .../tubemq/client/consumer/BaseMessageConsumer.java | 21 +++++++++++++++++++--
 .../corebase/policies/FlowCtrlRuleHandler.java      |  6 +++---
 .../corebase/policies/TestFlowCtrlRuleHandler.java  |  2 +-
 .../inlong/tubemq/server/broker/TubeBroker.java     |  4 ++--
 4 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index b7f5851..9926864 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1116,9 +1116,17 @@ public class BaseMessageConsumer implements MessageConsumer {
         if (response.hasGroupFlowCheckId()) {
             final int qryPriorityId = response.hasQryPriorityId()
                     ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+            if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+                try {
+                    defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                            response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+                } catch (Exception e1) {
+                    logger.warn("[Register response] found parse default flowCtrl rules failure", e1);
+                }
+            }
             if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
                 try {
-                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                    groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
                             response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
@@ -1143,9 +1151,18 @@ public class BaseMessageConsumer implements MessageConsumer {
         if (response.hasGroupFlowCheckId()) {
             final int qryPriorityId = response.hasQryPriorityId()
                     ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+            if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+                try {
+                    defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                            response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+                } catch (Exception e1) {
+                    logger.warn(
+                            "[Heartbeat response] found parse default flowCtrl rules failure", e1);
+                }
+            }
             if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
                 try {
-                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+                    groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
                             response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn(
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index 8039536..6b4e64b 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -92,9 +92,9 @@ public class FlowCtrlRuleHandler {
      * @param flowCtrlInfo
      * @throws Exception
      */
-    public void updateDefFlowCtrlInfo(final int qyrPriorityId,
-                                      final long flowCtrlId,
-                                      final String flowCtrlInfo) throws Exception {
+    public void updateFlowCtrlInfo(final int qyrPriorityId,
+                                   final long flowCtrlId,
+                                   final String flowCtrlInfo) throws Exception {
         if (flowCtrlId == this.flowCtrlId.get()) {
             return;
         }
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index c83e0df..d501c46 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -41,7 +41,7 @@ public class TestFlowCtrlRuleHandler {
     public void testFlowCtrlRuleHandler() {
         try {
             FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
-            handler.updateDefFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
+            handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
             TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
             Calendar rightNow = Calendar.getInstance(timeZone);
             int hour = rightNow.get(Calendar.HOUR_OF_DAY);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index a4cb336..fbcc094 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -314,7 +314,7 @@ public class TubeBroker implements Stoppable {
                 flowCheckId = response.getFlowCheckId();
                 try {
                     flowCtrlRuleHandler
-                            .updateDefFlowCtrlInfo(qryPriorityId,
+                            .updateFlowCtrlInfo(qryPriorityId,
                                     flowCheckId, response.getFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn(
@@ -432,7 +432,7 @@ public class TubeBroker implements Stoppable {
             if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
                 try {
                     flowCtrlRuleHandler
-                            .updateDefFlowCtrlInfo(response.getQryPriorityId(),
+                            .updateFlowCtrlInfo(response.getQryPriorityId(),
                                     response.getFlowCheckId(), response.getFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn("[Register response] found parse flowCtrl rules failure", e1);