You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/14 12:27:36 UTC
[incubator-inlong] branch master updated: [INLONG-1552]Java SDK
should deal with the default flow control rule (#1556)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ceaee3e [INLONG-1552]Java SDK should deal with the default flow control rule (#1556)
ceaee3e is described below
commit ceaee3e3cf9310fe46c5535b134c66871139bf0b
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Sep 14 20:27:28 2021 +0800
[INLONG-1552]Java SDK should deal with the default flow control rule (#1556)
---
.../tubemq/client/consumer/BaseMessageConsumer.java | 21 +++++++++++++++++++--
.../corebase/policies/FlowCtrlRuleHandler.java | 6 +++---
.../corebase/policies/TestFlowCtrlRuleHandler.java | 2 +-
.../inlong/tubemq/server/broker/TubeBroker.java | 4 ++--
4 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index b7f5851..9926864 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1116,9 +1116,17 @@ public class BaseMessageConsumer implements MessageConsumer {
if (response.hasGroupFlowCheckId()) {
final int qryPriorityId = response.hasQryPriorityId()
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+ if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Register response] found parse default flowCtrl rules failure", e1);
+ }
+ }
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
- groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
@@ -1143,9 +1151,18 @@ public class BaseMessageConsumer implements MessageConsumer {
if (response.hasGroupFlowCheckId()) {
final int qryPriorityId = response.hasQryPriorityId()
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+ if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn(
+ "[Heartbeat response] found parse default flowCtrl rules failure", e1);
+ }
+ }
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
- groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn(
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index 8039536..6b4e64b 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -92,9 +92,9 @@ public class FlowCtrlRuleHandler {
* @param flowCtrlInfo
* @throws Exception
*/
- public void updateDefFlowCtrlInfo(final int qyrPriorityId,
- final long flowCtrlId,
- final String flowCtrlInfo) throws Exception {
+ public void updateFlowCtrlInfo(final int qyrPriorityId,
+ final long flowCtrlId,
+ final String flowCtrlInfo) throws Exception {
if (flowCtrlId == this.flowCtrlId.get()) {
return;
}
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index c83e0df..d501c46 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -41,7 +41,7 @@ public class TestFlowCtrlRuleHandler {
public void testFlowCtrlRuleHandler() {
try {
FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
- handler.updateDefFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
+ handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
Calendar rightNow = Calendar.getInstance(timeZone);
int hour = rightNow.get(Calendar.HOUR_OF_DAY);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index a4cb336..fbcc094 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -314,7 +314,7 @@ public class TubeBroker implements Stoppable {
flowCheckId = response.getFlowCheckId();
try {
flowCtrlRuleHandler
- .updateDefFlowCtrlInfo(qryPriorityId,
+ .updateFlowCtrlInfo(qryPriorityId,
flowCheckId, response.getFlowControlInfo());
} catch (Exception e1) {
logger.warn(
@@ -432,7 +432,7 @@ public class TubeBroker implements Stoppable {
if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
try {
flowCtrlRuleHandler
- .updateDefFlowCtrlInfo(response.getQryPriorityId(),
+ .updateFlowCtrlInfo(response.getQryPriorityId(),
response.getFlowCheckId(), response.getFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse flowCtrl rules failure", e1);