You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/16 06:08:23 UTC

[incubator-inlong] branch master updated: Fixed if an exception is thrown, no response message to client while using order message (#3162)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new de88258  Fixed if an exception is thrown, no response message to client while using order message (#3162)
de88258 is described below

commit de88258b8f4a6809c3fa22bab1ea7cfde2b6113e
Author: baomingyu <ba...@163.com>
AuthorDate: Wed Mar 16 14:08:02 2022 +0800

    Fixed if an exception is thrown, no response message to client while using order message (#3162)
---
 .../java/org/apache/inlong/common/reporpter/AbstractReporter.java  | 5 +++--
 .../org/apache/inlong/common/metric/reporter/ReporterTest.java     | 3 ++-
 inlong-dataproxy/conf/common.properties                            | 2 +-
 .../apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java   | 6 ++++--
 .../org/apache/inlong/dataproxy/source/ServerMessageHandler.java   | 7 ++++++-
 5 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
index 6324eaa..90d733a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -101,14 +101,15 @@ public class AbstractReporter<T> {
             return null;
         }
         HttpPost httpPost = new HttpPost(serverUrl);
+        String returnStr = null;
         try {
             StringEntity stringEntity = new StringEntity(gson.toJson(data));
             stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
             httpPost.setEntity(stringEntity);
-            String returnStr = executeHttpPost(httpPost);
+            returnStr = executeHttpPost(httpPost);
             return parse(returnStr);
         } catch (Exception e) {
-            LOGGER.error("syncReportData has exception e = {}", e);
+            LOGGER.error("syncReportData has exception returnStr = {}, e:", returnStr, e);
             throw e;
         }
     }
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
index e7fdc4f..eadbea2 100644
--- a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -32,7 +32,8 @@ public class ReporterTest {
 
     @Test
     public void streamConfigLogReporterTest() throws Exception {
-        String serverUrl = "http://127.0.0.1:/8080/openapi/stream/log/reportConfigLogStatus";
+        String serverUrl = "http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log"
+                + "/reportConfigLogStatus";
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         StreamConfigLogReporter streamConfigLogReporter = new StreamConfigLogReporter(httpClient,
                 serverUrl);
diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index 873ade9..0da144a 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -36,5 +36,5 @@ audit.proxys=127.0.0.1:10081
 
 # report config log
 report.config.log.enable=true
-report.config.log.server.url=http://127.0.0.1:8083/openapi/stream/log/reportConfigLogStatus
+report.config.log.server.url=http://127.0.0.1:8083/api/inlong/manager/openapi/stream/log/reportConfigLogStatus
 report.config.log.interval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 64da058..1ea8dc4 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -207,7 +207,6 @@ public class PulsarClientService {
                 MessageId msgId = producer.newMessage().key(partitionKey)
                         .properties(proMap).value(event.getBody())
                         .send();
-                sendResponse((OrderEvent)event);
                 sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
                 AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
                 forCallBackP.setCanUseSend(true);
@@ -220,7 +219,7 @@ public class PulsarClientService {
                 forCallBackP.setCanUseSend(false);
                 sendMessageCallBack.handleMessageSendException(topic, es, ex);
             }
-
+            sendResponse((OrderEvent)event);
         } else {
             producer.newMessage().properties(proMap).value(event.getBody())
                     .sendAsync().thenAccept((msgId) -> {
@@ -246,6 +245,9 @@ public class PulsarClientService {
      * @param orderEvent orderEvent
      */
     private void sendResponse(OrderEvent orderEvent) {
+        if ("false".equals(orderEvent.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
+            return;
+        }
         if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
             orderEvent.getCtx().channel().eventLoop().execute(() -> {
                 ByteBuf binBuffer = MessageUtils.getResponsePackage("",
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 7eb2792..3c2b97c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -442,6 +442,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
                 }
 
+                if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
+                    headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
+                }
+
                 String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
                 if (StringUtils.isNotEmpty(syncSend)) {
                     headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
@@ -525,7 +529,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             Channel remoteChannel,
             SocketAddress remoteSocketAddress,
             MsgType msgType) throws Exception {
-        if (!commonAttrMap.containsKey("isAck") || "true".equals(commonAttrMap.get("isAck"))) {
+        String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+        if (isAck == null || "true".equals(isAck)) {
             if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
                     .equals(msgType)
                     || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR