You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/16 06:08:23 UTC
[incubator-inlong] branch master updated: Fixed if an exception is thrown, no response message to client while using order message (#3162)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new de88258 Fixed if an exception is thrown, no response message to client while using order message (#3162)
de88258 is described below
commit de88258b8f4a6809c3fa22bab1ea7cfde2b6113e
Author: baomingyu <ba...@163.com>
AuthorDate: Wed Mar 16 14:08:02 2022 +0800
Fixed if an exception is thrown, no response message to client while using order message (#3162)
---
.../java/org/apache/inlong/common/reporpter/AbstractReporter.java | 5 +++--
.../org/apache/inlong/common/metric/reporter/ReporterTest.java | 3 ++-
inlong-dataproxy/conf/common.properties | 2 +-
.../apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java | 6 ++++--
.../org/apache/inlong/dataproxy/source/ServerMessageHandler.java | 7 ++++++-
5 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
index 6324eaa..90d733a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -101,14 +101,15 @@ public class AbstractReporter<T> {
return null;
}
HttpPost httpPost = new HttpPost(serverUrl);
+ String returnStr = null;
try {
StringEntity stringEntity = new StringEntity(gson.toJson(data));
stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
httpPost.setEntity(stringEntity);
- String returnStr = executeHttpPost(httpPost);
+ returnStr = executeHttpPost(httpPost);
return parse(returnStr);
} catch (Exception e) {
- LOGGER.error("syncReportData has exception e = {}", e);
+ LOGGER.error("syncReportData has exception returnStr = {}, e:", returnStr, e);
throw e;
}
}
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
index e7fdc4f..eadbea2 100644
--- a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -32,7 +32,8 @@ public class ReporterTest {
@Test
public void streamConfigLogReporterTest() throws Exception {
- String serverUrl = "http://127.0.0.1:/8080/openapi/stream/log/reportConfigLogStatus";
+ String serverUrl = "http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log"
+ + "/reportConfigLogStatus";
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
StreamConfigLogReporter streamConfigLogReporter = new StreamConfigLogReporter(httpClient,
serverUrl);
diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index 873ade9..0da144a 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -36,5 +36,5 @@ audit.proxys=127.0.0.1:10081
# report config log
report.config.log.enable=true
-report.config.log.server.url=http://127.0.0.1:8083/openapi/stream/log/reportConfigLogStatus
+report.config.log.server.url=http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log/reportConfigLogStatus
report.config.log.interval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 64da058..1ea8dc4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -207,7 +207,6 @@ public class PulsarClientService {
MessageId msgId = producer.newMessage().key(partitionKey)
.properties(proMap).value(event.getBody())
.send();
- sendResponse((OrderEvent)event);
sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
forCallBackP.setCanUseSend(true);
@@ -220,7 +219,7 @@ public class PulsarClientService {
forCallBackP.setCanUseSend(false);
sendMessageCallBack.handleMessageSendException(topic, es, ex);
}
-
+ sendResponse((OrderEvent)event);
} else {
producer.newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
@@ -246,6 +245,9 @@ public class PulsarClientService {
* @param orderEvent orderEvent
*/
private void sendResponse(OrderEvent orderEvent) {
+ if ("false".equals(orderEvent.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
+ return;
+ }
if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
orderEvent.getCtx().channel().eventLoop().execute(() -> {
ByteBuf binBuffer = MessageUtils.getResponsePackage("",
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 7eb2792..3c2b97c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -442,6 +442,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
}
+ if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
+ headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
+ }
+
String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
if (StringUtils.isNotEmpty(syncSend)) {
headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
@@ -525,7 +529,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
Channel remoteChannel,
SocketAddress remoteSocketAddress,
MsgType msgType) throws Exception {
- if (!commonAttrMap.containsKey("isAck") || "true".equals(commonAttrMap.get("isAck"))) {
+ String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+ if (isAck == null || "true".equals(isAck)) {
if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
.equals(msgType)
|| MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR