You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/12 06:14:18 UTC
[GitHub] [rocketmq] lwclover opened a new pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
lwclover opened a new pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555
Fix Issue[[3556](https://github.com/apache/rocketmq/issues/3556)]
RocketMQ client updates topic route infomations every 30 seconds,
When broker is down,Rocketmq client can not connect to the broker. Sending async message can not retry,Throwing a RemotingConnectException.
The RemotingConnectException below:
```
org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [5]ms, Topic: SSP_VC_COMMAND_CFG_PULL_PUSH_RESPOND_RECORD, BrokersSent: [broker-ssp-11]
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:610)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:87)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$2.run(DefaultMQProducerImpl.java:467)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <x.x.x.x10911> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:540)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:375)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf(MQClientAPIImpl.java:332)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf$accessor$wRKVBfSV(MQClientAPIImpl.java)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$auxiliary$5onkBFUr.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:765)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:529)
... 7 common frames omitted
```
The RemotingConnectException caused by: throw new RemotingConnectException(addr);
```
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
```
CommunicationMode.Async retry times is 1, when throws RemotingConnectException can not retry.
```
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException | MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/45556931/badge)](https://coveralls.io/builds/45556931)
Coverage decreased (-0.1%) to 53.143% when pulling **c424e1ad81410ae29f7c35dfd576a7be4c2dd58c on lwclover:develop** into **28d78498544c07524fe70454337862c7c43a781a on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775853192
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
catch "RemotingConnectException、RemotingSendRequestException、RemotingTimeoutException", fast fail RemotingTooMuchRequestException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] duhenglucky commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
duhenglucky commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-1010579763
@lwclover Wait for CI checks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/45259629/badge)](https://coveralls.io/builds/45259629)
Coverage decreased (-0.02%) to 53.261% when pulling **a67469e919e456c8014276fd0b462837d92d1e26 on lwclover:develop** into **28d78498544c07524fe70454337862c7c43a781a on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7bb99a7) into [develop](https://codecov.io/gh/apache/rocketmq/commit/28d78498544c07524fe70454337862c7c43a781a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d7849) will **decrease** coverage by `0.12%`.
> The diff coverage is `26.31%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
- Coverage 47.32% 47.20% -0.13%
+ Complexity 5038 5018 -20
=============================================
Files 627 627
Lines 41348 41352 +4
Branches 5372 5372
=============================================
- Hits 19568 19520 -48
- Misses 19356 19410 +54
+ Partials 2424 2422 -2
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `12.92% <26.31%> (+0.05%)` | :arrow_up: |
| [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
| [...ketmq/common/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9Db25zdW1lckNvbm5lY3Rpb24uamF2YQ==) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
| [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
| [...lient/impl/consumer/DefaultMQPushConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TVFQdXNoQ29uc3VtZXJJbXBsLmphdmE=) | `39.47% <0.00%> (-2.29%)` | :arrow_down: |
| [...rocketmq/client/impl/factory/MQClientInstance.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9mYWN0b3J5L01RQ2xpZW50SW5zdGFuY2UuamF2YQ==) | `51.30% <0.00%> (-2.00%)` | :arrow_down: |
| [...mq/client/impl/consumer/RebalanceLitePullImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VMaXRlUHVsbEltcGwuamF2YQ==) | `48.52% <0.00%> (-1.48%)` | :arrow_down: |
| [...e/rocketmq/client/impl/consumer/RebalanceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VJbXBsLmphdmE=) | `41.40% <0.00%> (-1.18%)` | :arrow_down: |
| [...nt/impl/consumer/ConsumeMessageOrderlyService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Db25zdW1lTWVzc2FnZU9yZGVybHlTZXJ2aWNlLmphdmE=) | `43.68% <0.00%> (-0.73%)` | :arrow_down: |
| [...ent/impl/consumer/DefaultLitePullConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TGl0ZVB1bGxDb25zdW1lckltcGwuamF2YQ==) | `67.99% <0.00%> (-0.52%)` | :arrow_down: |
| ... and [10 more](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [28d7849...7bb99a7](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/45623106/badge)](https://coveralls.io/builds/45623106)
Coverage decreased (-2.2%) to 51.08% when pulling **895decbfc15f4c1b10b649875a96c0fbd1b4f2f3 on lwclover:develop** into **28d78498544c07524fe70454337862c7c43a781a on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] MatrixHB commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
MatrixHB commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-996681362
@lwclover you are right.
When the channel to the broker cannot be established, it will directly enter the following code block.
![image](https://user-images.githubusercontent.com/23614576/146499518-c1dc2795-b0e7-41c8-89a7-4dee105872a7.png)
Retry for send async is made for the execution of "org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl", but in the given case, "invokeAsyncImpl" has not been executed yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (895decb) into [develop](https://codecov.io/gh/apache/rocketmq/commit/28d78498544c07524fe70454337862c7c43a781a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d7849) will **decrease** coverage by `0.30%`.
> The diff coverage is `35.89%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
- Coverage 47.32% 47.02% -0.31%
+ Complexity 5038 4858 -180
=============================================
Files 627 636 +9
Lines 41348 42249 +901
Branches 5372 5521 +149
=============================================
+ Hits 19568 19867 +299
- Misses 19356 19891 +535
- Partials 2424 2491 +67
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `13.98% <35.89%> (+1.10%)` | :arrow_up: |
| [...ketmq/common/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9Db25zdW1lckNvbm5lY3Rpb24uamF2YQ==) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
| [...a/org/apache/rocketmq/filter/util/BloomFilter.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CbG9vbUZpbHRlci5qYXZh) | `59.13% <0.00%> (-2.16%)` | :arrow_down: |
| [...ava/org/apache/rocketmq/filter/util/BitsArray.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CaXRzQXJyYXkuamF2YQ==) | `58.11% <0.00%> (-1.71%)` | :arrow_down: |
| [...a/org/apache/rocketmq/store/StoreStatsService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL1N0b3JlU3RhdHNTZXJ2aWNlLmphdmE=) | `28.66% <0.00%> (-1.31%)` | :arrow_down: |
| [...a/org/apache/rocketmq/broker/BrokerController.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyQ29udHJvbGxlci5qYXZh) | `46.54% <0.00%> (-0.89%)` | :arrow_down: |
| [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `55.48% <0.00%> (-0.76%)` | :arrow_down: |
| [...e/rocketmq/client/impl/consumer/RebalanceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VJbXBsLmphdmE=) | `42.18% <0.00%> (-0.40%)` | :arrow_down: |
| [...ocketmq/broker/processor/SendMessageProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1NlbmRNZXNzYWdlUHJvY2Vzc29yLmphdmE=) | `39.59% <0.00%> (-0.30%)` | :arrow_down: |
| [...java/org/apache/rocketmq/broker/BrokerStartup.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvQnJva2VyU3RhcnR1cC5qYXZh) | `5.47% <0.00%> (-0.20%)` | :arrow_down: |
| ... and [28 more](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [28d7849...895decb](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986641891
> Agree, in the asynchronous sending mode, there is no retry when the connection establishment fails, and only when the request and response fail.
Thanks, you know me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] vongosling commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-1001816116
Backoff strategy optimization is one of the spots that I would most like to see in RocketMQ. You should consider the retry tuple in here, when you get callback invoked. you have finished the retry in async mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a67469e) into [develop](https://codecov.io/gh/apache/rocketmq/commit/28d78498544c07524fe70454337862c7c43a781a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d7849) will **decrease** coverage by `0.04%`.
> The diff coverage is `25.64%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
- Coverage 47.32% 47.27% -0.05%
+ Complexity 5038 5032 -6
=============================================
Files 627 627
Lines 41348 41353 +5
Branches 5372 5372
=============================================
- Hits 19568 19551 -17
- Misses 19356 19376 +20
- Partials 2424 2426 +2
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `12.91% <25.64%> (+0.03%)` | :arrow_up: |
| [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
| [...ketmq/common/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9Db25zdW1lckNvbm5lY3Rpb24uamF2YQ==) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
| [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
| [...a/org/apache/rocketmq/filter/util/BloomFilter.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CbG9vbUZpbHRlci5qYXZh) | `59.13% <0.00%> (-2.16%)` | :arrow_down: |
| [...ent/impl/consumer/DefaultLitePullConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TGl0ZVB1bGxDb25zdW1lckltcGwuamF2YQ==) | `67.99% <0.00%> (-0.52%)` | :arrow_down: |
| [...e/rocketmq/client/impl/consumer/RebalanceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VJbXBsLmphdmE=) | `42.18% <0.00%> (-0.40%)` | :arrow_down: |
| [...etmq/client/latency/LatencyFaultToleranceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvbGF0ZW5jeS9MYXRlbmN5RmF1bHRUb2xlcmFuY2VJbXBsLmphdmE=) | `50.00% <0.00%> (ø)` | |
| [...org/apache/rocketmq/common/stats/StatsItemSet.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vc3RhdHMvU3RhdHNJdGVtU2V0LmphdmE=) | `43.28% <0.00%> (+1.49%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [28d7849...a67469e](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775845562
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
RemotingTooMuchRequestException should not retry
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775858675
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775853192
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
catch "RemotingConnectException、RemotingSendRequestException、RemotingTimeoutException", fast fail RemotingTooMuchRequestException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (559d136) into [develop](https://codecov.io/gh/apache/rocketmq/commit/a82306853d5a5902a180ed07f660e45e4bab588e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a823068) will **decrease** coverage by `0.10%`.
> The diff coverage is `100.00%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
- Coverage 48.84% 48.73% -0.11%
+ Complexity 4654 4642 -12
=============================================
Files 555 555
Lines 36733 36733
Branches 4838 4838
=============================================
- Hits 17943 17903 -40
- Misses 16544 16588 +44
+ Partials 2246 2242 -4
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...mq/client/impl/producer/DefaultMQProducerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9wcm9kdWNlci9EZWZhdWx0TVFQcm9kdWNlckltcGwuamF2YQ==) | `46.56% <100.00%> (+0.12%)` | :arrow_up: |
| [...tmq/logappender/log4j2/RocketmqLog4j2Appender.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nYXBwZW5kZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL2xvZ2FwcGVuZGVyL2xvZzRqMi9Sb2NrZXRtcUxvZzRqMkFwcGVuZGVyLmphdmE=) | `35.00% <0.00%> (-10.00%)` | :arrow_down: |
| [...in/java/org/apache/rocketmq/test/util/MQAdmin.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW4uamF2YQ==) | `38.88% <0.00%> (-5.56%)` | :arrow_down: |
| [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
| [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `47.05% <0.00%> (-4.05%)` | :arrow_down: |
| [...he/rocketmq/client/impl/consumer/ProcessQueue.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Qcm9jZXNzUXVldWUuamF2YQ==) | `58.60% <0.00%> (-3.73%)` | :arrow_down: |
| [...nt/impl/consumer/ConsumeMessageOrderlyService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Db25zdW1lTWVzc2FnZU9yZGVybHlTZXJ2aWNlLmphdmE=) | `38.98% <0.00%> (-3.62%)` | :arrow_down: |
| [...ava/org/apache/rocketmq/filter/util/BitsArray.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CaXRzQXJyYXkuamF2YQ==) | `59.82% <0.00%> (-2.57%)` | :arrow_down: |
| [...ent/impl/consumer/DefaultLitePullConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TGl0ZVB1bGxDb25zdW1lckltcGwuamF2YQ==) | `67.99% <0.00%> (-1.56%)` | :arrow_down: |
| [...a/org/apache/rocketmq/store/StoreStatsService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL1N0b3JlU3RhdHNTZXJ2aWNlLmphdmE=) | `28.85% <0.00%> (-0.66%)` | :arrow_down: |
| ... and [10 more](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [a823068...559d136](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775823162
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
How about change the _RemotingConnectException_ to _Exception_?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] guyinyou commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-1001855723
![image](https://user-images.githubusercontent.com/36399867/147526351-49216496-01d1-447f-a46e-82ab336265f0.png)
It is feasible to try the "try catch" way proposed by @duhenglucky . This way can converge anomalies to one place for processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] guyinyou commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
guyinyou commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r762799809
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
##########
@@ -582,7 +582,7 @@ private SendResult sendDefaultImpl(
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
+ int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed();
Review comment:
when request and response fail, it will send (1+getRetryTimesWhenSendAsyncFailed())^2 times
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986641891
> Agree, in the asynchronous sending mode, there is no retry when the connection establishment fails, and only when the request and response fail.
Thank you for labeling the bug
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986355533
> In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] guyinyou commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986406481
> > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
>
> the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/45258986/badge)](https://coveralls.io/builds/45258986)
Coverage decreased (-0.1%) to 53.174% when pulling **7bb99a76a729d8d5ef8d651cb4a511d229e82284 on lwclover:develop** into **28d78498544c07524fe70454337862c7c43a781a on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775853192
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
catch RemotingConnectException、RemotingSendRequestException、RemotingTimeoutException, fast fail RemotingTooMuchRequestException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] odbozhou merged pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
odbozhou merged pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover closed pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover closed pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-995354173
@duhenglucky
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] guyinyou commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-1001560359
> > > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> > >
> > >
> > > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
> >
> >
> > I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
>
> > > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> > >
> > >
> > > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
> >
> >
> > I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
>
> No, after trying retryTimesWhenSendAsyncFailed times, it calls callback.onException method.
![image](https://user-images.githubusercontent.com/36399867/147473982-6e27c7b3-bbe8-4cdd-81e2-7aa1295af8b5.png)
You are right. I looked at the code. Asynchronous sending will only go through the for loop when an exception is thrown. In other cases, it will only be executed once and return.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (54f6730) into [develop](https://codecov.io/gh/apache/rocketmq/commit/28d78498544c07524fe70454337862c7c43a781a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d7849) will **increase** coverage by `0.01%`.
> The diff coverage is `100.00%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
+ Coverage 47.28% 47.30% +0.01%
- Complexity 5031 5035 +4
=============================================
Files 627 627
Lines 41348 41348
Branches 5372 5372
=============================================
+ Hits 19551 19558 +7
- Misses 19369 19370 +1
+ Partials 2428 2420 -8
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...mq/client/impl/producer/DefaultMQProducerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9wcm9kdWNlci9EZWZhdWx0TVFQcm9kdWNlckltcGwuamF2YQ==) | `45.94% <100.00%> (+0.12%)` | :arrow_up: |
| [...rocketmq/broker/filtersrv/FilterServerManager.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmlsdGVyc3J2L0ZpbHRlclNlcnZlck1hbmFnZXIuamF2YQ==) | `20.00% <0.00%> (-14.29%)` | :arrow_down: |
| [...ent/impl/consumer/DefaultLitePullConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TGl0ZVB1bGxDb25zdW1lckltcGwuamF2YQ==) | `67.99% <0.00%> (-0.52%)` | :arrow_down: |
| [...a/org/apache/rocketmq/store/StoreStatsService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL1N0b3JlU3RhdHNTZXJ2aWNlLmphdmE=) | `29.31% <0.00%> (ø)` | |
| [...etmq/client/latency/LatencyFaultToleranceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvbGF0ZW5jeS9MYXRlbmN5RmF1bHRUb2xlcmFuY2VJbXBsLmphdmE=) | `50.00% <0.00%> (ø)` | |
| [...e/rocketmq/client/impl/consumer/RebalanceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VJbXBsLmphdmE=) | `42.18% <0.00%> (+0.39%)` | :arrow_up: |
| [...he/rocketmq/client/impl/consumer/ProcessQueue.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Qcm9jZXNzUXVldWUuamF2YQ==) | `61.00% <0.00%> (+0.45%)` | :arrow_up: |
| [...n/java/org/apache/rocketmq/store/ha/HAService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0hBU2VydmljZS5qYXZh) | `71.94% <0.00%> (+0.66%)` | :arrow_up: |
| [...he/rocketmq/client/trace/AsyncTraceDispatcher.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvdHJhY2UvQXN5bmNUcmFjZURpc3BhdGNoZXIuamF2YQ==) | `78.21% <0.00%> (+0.99%)` | :arrow_up: |
| ... and [6 more](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [28d7849...54f6730](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/45256137/badge)](https://coveralls.io/builds/45256137)
Coverage decreased (-0.02%) to 53.268% when pulling **54f673030e176dd1e44bab93e4d4bbe04a9c0b74 on lwclover:develop** into **28d78498544c07524fe70454337862c7c43a781a on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] guyinyou commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-985380519
In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-983202497
> ```java
> sendKernelImpl
> ```
This is when the broker can connect to, But when **can not connect to the broker**, can not retry
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] duhenglucky edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
duhenglucky edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-1010579763
@lwclover Would you like to pay more attention to the failed tests: MQClientAPIImplTest.testSendMessageAsync_WithException:201 » NullPointer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448661
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3555](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c424e1a) into [develop](https://codecov.io/gh/apache/rocketmq/commit/28d78498544c07524fe70454337862c7c43a781a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d7849) will **decrease** coverage by `0.16%`.
> The diff coverage is `35.89%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3555/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## develop #3555 +/- ##
=============================================
- Coverage 47.32% 47.16% -0.17%
+ Complexity 5038 5021 -17
=============================================
Files 627 628 +1
Lines 41348 41411 +63
Branches 5372 5379 +7
=============================================
- Hits 19568 19530 -38
- Misses 19356 19450 +94
- Partials 2424 2431 +7
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `13.87% <35.89%> (+0.99%)` | :arrow_up: |
| [...tmq/logappender/log4j2/RocketmqLog4j2Appender.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nYXBwZW5kZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL2xvZ2FwcGVuZGVyL2xvZzRqMi9Sb2NrZXRtcUxvZzRqMkFwcGVuZGVyLmphdmE=) | `35.00% <0.00%> (-10.00%)` | :arrow_down: |
| [...ketmq/common/protocol/body/ConsumerConnection.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9Db25zdW1lckNvbm5lY3Rpb24uamF2YQ==) | `95.83% <0.00%> (-4.17%)` | :arrow_down: |
| [...lient/impl/consumer/DefaultMQPushConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TVFQdXNoQ29uc3VtZXJJbXBsLmphdmE=) | `39.47% <0.00%> (-2.29%)` | :arrow_down: |
| [...a/org/apache/rocketmq/filter/util/BloomFilter.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CbG9vbUZpbHRlci5qYXZh) | `59.13% <0.00%> (-2.16%)` | :arrow_down: |
| [...rocketmq/client/impl/factory/MQClientInstance.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9mYWN0b3J5L01RQ2xpZW50SW5zdGFuY2UuamF2YQ==) | `51.30% <0.00%> (-2.00%)` | :arrow_down: |
| [...mq/client/impl/consumer/RebalanceLitePullImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VMaXRlUHVsbEltcGwuamF2YQ==) | `48.52% <0.00%> (-1.48%)` | :arrow_down: |
| [...rg/apache/rocketmq/remoting/netty/NettyLogger.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5TG9nZ2VyLmphdmE=) | `14.96% <0.00%> (-1.37%)` | :arrow_down: |
| [...e/rocketmq/client/impl/consumer/RebalanceImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9SZWJhbGFuY2VJbXBsLmphdmE=) | `41.40% <0.00%> (-1.18%)` | :arrow_down: |
| [...nt/impl/consumer/ConsumeMessageOrderlyService.java](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Db25zdW1lTWVzc2FnZU9yZGVybHlTZXJ2aWNlLmphdmE=) | `43.68% <0.00%> (-0.73%)` | :arrow_down: |
| ... and [15 more](https://codecov.io/gh/apache/rocketmq/pull/3555/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [28d7849...c424e1a](https://codecov.io/gh/apache/rocketmq/pull/3555?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986640212
> > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> >
> >
> > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
>
> I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
> > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> >
> >
> > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
>
> I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover edited a comment on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover edited a comment on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986640212
> > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> >
> >
> > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
>
> I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
> > > In the asynchronous sending mode, the sendCallBack function will be called back after the message fails to be sent, and it can be re-sent inside.
> >
> >
> > the code 'if (channel != null && channel.isActive()) ' return false, it direct throws RemotingConnectException
>
> I know what you mean, but if you increase the timesTotal in "for (; times < timesTotal; times++) , the number of retries will become retryTimesWhenSendAsyncFailed² times. Because in addition to the situation you described, there will be retryTimesWhenSendAsyncFailed retry attempts at "this.mQClientFactory.getMQClientAPIImpl().sendMessage"
No, after trying retryTimesWhenSendAsyncFailed times, it calls callback.onException method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] coveralls commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982448525
[![Coverage Status](https://coveralls.io/builds/44628987/badge)](https://coveralls.io/builds/44628987)
Coverage decreased (-0.009%) to 55.059% when pulling **559d136de355ba93b12dfaabe92ffdf30f154965 on lwclover:develop** into **a82306853d5a5902a180ed07f660e45e4bab588e on apache:develop**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-982503190
Asynchronous retry is here, in class `DefaultMQProducerImpl`, line: 843
```java
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// ...
// line#843:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover closed pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover closed pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover closed pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover closed pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lwclover commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
lwclover commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775845562
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
##########
@@ -523,65 +523,72 @@ private void sendMessageAsync(
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
+ try {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
try {
- sendCallback.onSuccess(sendResult);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ return;
}
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
} else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
}
}
- }
- });
+ });
+ } catch (RemotingConnectException ex) {
Review comment:
RemotingTooMuchRequestException should not retry
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] Git-Yang commented on pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#issuecomment-986472012
Agree, in the asynchronous sending mode, there is no retry when the connection establishment fails, and only when the request and response fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3555: [Issue #3556] Fix:When broker is down, rocketmq client can not retry under Async send model
Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #3555:
URL: https://github.com/apache/rocketmq/pull/3555#discussion_r775412260
##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
##########
@@ -582,7 +582,7 @@ private SendResult sendDefaultImpl(
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
+ int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1 + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed();
Review comment:
This is indeed an issue that when there is a problem with the broker network, there will be no retry after the asynchronous sending fails, howerver, this change seems to result in too many retries, because the async retry logic is in the kernel method: _org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync_, so how about adding a try... catch arround the _this.remotingClient.invokeAsync_ with an _onExceptionImpl_ exception resolve logic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org