You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:45 UTC

[inlong] 03/07: [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1e3212cdb7fe27bdf3140766f3834b80028dacde
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 17:06:39 2022 +0800

    [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)
---
 .../inlong/common/msg/AttributeConstants.java      |   5 +
 .../apache/inlong/sdk/dataproxy/SendResult.java    |  11 +-
 .../inlong/sdk/dataproxy/codec/EncodeObject.java   | 154 ++++++++++++++-------
 .../inlong/sdk/dataproxy/codec/ErrorCode.java      |  52 -------
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |  38 ++---
 .../inlong/sdk/dataproxy/network/Sender.java       |  92 ++++++------
 6 files changed, 177 insertions(+), 175 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 645de6d4f..6cd635ea7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -89,4 +89,9 @@ public interface AttributeConstants {
 
     // error message, add by receiver
     String MESSAGE_PROCESS_ERRMSG = "errMsg";
+
+    String MESSAGE_ID = "messageId";
+
+    // dataproxy IP from dp response ack
+    String MESSAGE_DP_IP = "dpIP";
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
index 10c89af38..75f057b8f 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
@@ -19,14 +19,19 @@
 package org.apache.inlong.sdk.dataproxy;
 
 public enum SendResult {
-    INVALID_ATTRIBUTES,
+    INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
     OK,
     TIMEOUT,
     CONNECTION_BREAK,
     THREAD_INTERRUPT,
     ASYNC_CALLBACK_BUFFER_FULL,
     NO_CONNECTION,
-    INVALID_DATA,
-    UNKOWN_ERROR
+    INVALID_DATA, // including DataProxyErrCode(103, 111)
+    BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104)
+    SINK_SERVICE_UNREADY, // DataProxyErrCode(1)
+    UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113)
+    TOPIC_IS_BLANK, // DataProxyErrCode(115)
+    DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120)
 
+    UNKOWN_ERROR
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index bf50507e2..8634621cf 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -18,12 +18,21 @@
 
 package org.apache.inlong.sdk.dataproxy.codec;
 
-import java.util.List;
-
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class EncodeObject {
-    private static final String MESSAGE_ID_PREFIX = "messageId=";
+
+    private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults()
+            .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
     private byte[] bodyBytes;
     private String attributes;
@@ -54,25 +63,24 @@ public class EncodeObject {
     private String msgUUID = null;
     private EncryptConfigEntry encryptEntry = null;
 
-    private boolean isException = false;
-    private ErrorCode exceptionError = null;
+    private SendResult sendResult = SendResult.OK;
+    private String errMsg;
+    private String dpIp;
 
-    /* Used by de_serialization. msgtype=7/8*/
+    /* Used by de_serialization. msgtype=8*/
     public EncodeObject() {
     }
 
+    /* Used by de_serialization. msgtype=7*/
+    public EncodeObject(String attributes) {
+        handleAttr(attributes);
+    }
+
     /* Used by de_serialization. */
     public EncodeObject(byte[] bodyBytes, String attributes) {
         this.bodyBytes = bodyBytes;
         this.attributes = attributes;
-        this.messageId = "";
-        String[] tokens = attributes.split("&");
-        for (int i = 0; i < tokens.length; i++) {
-            if (tokens[i].startsWith("messageId=")) {
-                this.messageId = tokens[i].substring(MESSAGE_ID_PREFIX.length(), tokens[i].length());
-                break;
-            }
-        }
+        handleAttr(attributes);
     }
 
     /* Used by serialization.But never used */
@@ -85,7 +93,7 @@ public class EncodeObject {
 
     // used for bytes initializtion,msgtype=3/5
     public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String groupId) {
+            int msgtype, boolean isCompress, final String groupId) {
         this.bodyBytes = bodyBytes;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
@@ -96,7 +104,7 @@ public class EncodeObject {
 
     // used for bodylist initializtion,msgtype=3/5
     public EncodeObject(List<byte[]> bodyList, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String groupId) {
+            int msgtype, boolean isCompress, final String groupId) {
         this.bodylist = bodyList;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
@@ -107,8 +115,8 @@ public class EncodeObject {
 
     // used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport,
-                        boolean isGroupIdTransfer, long dt, long seqId, String groupId,
-                        String streamId, String commonattr) {
+            boolean isGroupIdTransfer, long dt, long seqId, String groupId,
+            String streamId, String commonattr) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -123,8 +131,8 @@ public class EncodeObject {
 
     // used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -139,9 +147,9 @@ public class EncodeObject {
 
     // file agent, used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr,
-                        String messageKey, String proxyIp) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr,
+            String messageKey, String proxyIp) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -158,9 +166,9 @@ public class EncodeObject {
 
     // file agent, used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr,
-                        String messageKey, String proxyIp) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr,
+            String messageKey, String proxyIp) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -175,6 +183,60 @@ public class EncodeObject {
         this.proxyIp = proxyIp;
     }
 
+    private void handleAttr(String attributes) {
+        if (StringUtils.isBlank(attributes)) {
+            return;
+        }
+        Map<String, String> backAttrs = new HashMap<>(MAP_SPLITTER.split(attributes));
+        if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) {
+            this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID);
+        }
+        dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP);
+
+        String errCode = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+        // errCode is empty or equals 0 -> success
+        if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
+            this.sendResult = SendResult.OK;
+        } else {
+            // get errMsg
+            this.errMsg = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            if (StringUtils.isBlank(errMsg)) {
+                this.errMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
+            }
+            //sendResult
+            this.sendResult = convertToSendResult(Integer.parseInt(errCode));
+        }
+    }
+
+    private SendResult convertToSendResult(int errCode) {
+        DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
+        switch (dpErrCode) {
+            case SINK_SERVICE_UNREADY:
+                return SendResult.SINK_SERVICE_UNREADY;
+            case MISS_REQUIRED_GROUPID_ARGUMENT:
+            case MISS_REQUIRED_STREAMID_ARGUMENT:
+            case MISS_REQUIRED_DT_ARGUMENT:
+            case UNSUPPORTED_EXTEND_FIELD_VALUE:
+                return SendResult.INVALID_ATTRIBUTES;
+            case MISS_REQUIRED_BODY_ARGUMENT:
+            case EMPTY_MSG:
+                return SendResult.INVALID_DATA;
+            case BODY_EXCEED_MAX_LEN:
+                return SendResult.BODY_EXCEED_MAX_LEN;
+            case UNCONFIGURED_GROUPID_OR_STREAMID:
+                return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID;
+            case PUT_EVENT_TO_CHANNEL_FAILURE:
+            case NO_AVAILABLE_PRODUCER:
+            case PRODUCER_IS_NULL:
+            case SEND_REQUEST_TO_MQ_FAILURE:
+            case MQ_RETURN_ERROR:
+            case DUPLICATED_MESSAGE:
+                return SendResult.DATAPROXY_FAIL_TO_RECEIVE;
+            default:
+                return SendResult.UNKOWN_ERROR;
+        }
+    }
+
     public String getMsgUUID() {
         return msgUUID;
     }
@@ -215,14 +277,6 @@ public class EncodeObject {
         this.streamId = streamId;
     }
 
-    public void setMsgtype(int msgtype) {
-        this.msgtype = msgtype;
-    }
-
-    public void setBodyBytes(byte[] bodyBytes) {
-        this.bodyBytes = bodyBytes;
-    }
-
     public boolean isReport() {
         return isReport;
     }
@@ -317,10 +371,18 @@ public class EncodeObject {
         return msgtype;
     }
 
+    public void setMsgtype(int msgtype) {
+        this.msgtype = msgtype;
+    }
+
     public byte[] getBodyBytes() {
         return bodyBytes;
     }
 
+    public void setBodyBytes(byte[] bodyBytes) {
+        this.bodyBytes = bodyBytes;
+    }
+
     public String getAttributes() {
         return attributes;
     }
@@ -361,6 +423,10 @@ public class EncodeObject {
         return cnt;
     }
 
+    public void setCnt(int cnt) {
+        this.cnt = cnt;
+    }
+
     public int getRealCnt() {
         if (bodylist != null) {
             return bodylist.size();
@@ -368,23 +434,15 @@ public class EncodeObject {
         return 1;
     }
 
-    public void setCnt(int cnt) {
-        this.cnt = cnt;
-    }
-
-    public boolean isException() {
-        return isException;
-    }
-
-    public void setException(boolean exception) {
-        isException = exception;
+    public String getDpIp() {
+        return dpIp;
     }
 
-    public ErrorCode getExceptionError() {
-        return exceptionError;
+    public String getErrMsg() {
+        return errMsg;
     }
 
-    public void setExceptionError(ErrorCode exceptionError) {
-        this.exceptionError = exceptionError;
+    public SendResult getSendResult() {
+        return sendResult;
     }
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
deleted file mode 100644
index 3598860a4..000000000
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.codec;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public enum ErrorCode {
-
-    ATTR_ERROR(1),
-
-    DT_ERROR(2),
-
-    COMPRESS_ERROR(3),
-
-    OTHER_ERROR(4),
-
-    LONG_LENGTH_ERROR(5);
-    private final int value;
-    private static final Map<Integer, ErrorCode> map = new HashMap<>();
-
-    static {
-        for (ErrorCode errorCode : ErrorCode.values()) {
-            map.put(errorCode.value, errorCode);
-        }
-    }
-
-    ErrorCode(int value) {
-        this.value = value;
-    }
-
-    public static ErrorCode valueOf(int value) {
-        return map.get(value);
-    }
-
-}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 579b07b1e..7bf9bd883 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -21,15 +21,15 @@ package org.apache.inlong.sdk.dataproxy.codec;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import java.nio.charset.StandardCharsets;
-
-import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class);
 
     @Override
     protected void decode(ChannelHandlerContext ctx,
@@ -37,9 +37,9 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
         buffer.markReaderIndex();
         // totallen
         int totalLen = buffer.readInt();
-        logger.debug("decode totalLen : {}", totalLen);
+        LOGGER.debug("decode totalLen : {}", totalLen);
         if (totalLen != buffer.readableBytes()) {
-            logger.error("totalLen is not equal readableBytes.total:" + totalLen
+            LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen
                     + ";readableBytes:" + buffer.readableBytes());
             buffer.resetReaderIndex();
             throw new Exception("totalLen is not equal readableBytes.total");
@@ -48,13 +48,13 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
         int msgType = buffer.readByte() & 0x1f;
 
         if (msgType == 4) {
-            logger.info("debug decode");
+            LOGGER.info("debug decode");
         }
         if (msgType == 3 | msgType == 5) {
             // bodylen
             int bodyLength = buffer.readInt();
             if (bodyLength >= totalLen) {
-                logger.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+                LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen
                         + ";bodyLen:" + bodyLength);
                 buffer.resetReaderIndex();
                 throw new Exception("bodyLen is greater than totalLen.totalLen");
@@ -72,28 +72,20 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
                 attrBytes = new byte[attrLength];
                 buffer.readBytes(attrBytes);
             }
-            EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes,
-                    StandardCharsets.UTF_8));
+            EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, StandardCharsets.UTF_8));
             object.setMsgtype(5);
             out.add(object);
         } else if (msgType == 7) {
 
             int seqId = buffer.readInt();
             int attrLen = buffer.readShort();
-            EncodeObject object = new EncodeObject();
-            object.setMessageId(String.valueOf(seqId));
-
-            if (attrLen == 4) {
-                int errorValue = buffer.readInt();
-                ErrorCode errorCode = ErrorCode.valueOf(errorValue);
-                if (errorCode != null) {
-                    object.setException(true);
-                    object.setExceptionError(errorCode);
-                }
-            } else {
-                byte[] attrContent = new byte[attrLen];
-                buffer.readBytes(attrContent);
+            byte[] attrBytes = null;
+            if (attrLen > 0) {
+                attrBytes = new byte[attrLen];
+                buffer.readBytes(attrBytes);
             }
+            EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
+            object.setMessageId(String.valueOf(seqId));
 
             buffer.readShort();
 
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 49be1ce54..f1eef9978 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,10 +22,10 @@ import io.netty.channel.Channel;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.FileCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public class Sender {
 
-    private static final Logger logger = LoggerFactory.getLogger(Sender.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
 
     /* Store the callback used by asynchronously message sending. */
     private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks =
@@ -104,12 +104,12 @@ public class Sender {
 
         metricWorker = new MetricWorkerThread(configure, this);
         metricWorker.start();
-        logger.info("proxy sdk is starting!");
+        LOGGER.info("proxy sdk is starting!");
     }
 
     private void checkCallbackList() {
         // max wait for 1 min
-        logger.info("checking call back list before close, current size is {}",
+        LOGGER.info("checking call back list before close, current size is {}",
                 currentBufferSize.get());
         int count = 0;
         try {
@@ -118,10 +118,10 @@ public class Sender {
                 count += 1;
             }
             if (currentBufferSize.get() > 0) {
-                logger.warn("callback not empty {}, please check it", currentBufferSize.get());
+                LOGGER.warn("callback not empty {}, please check it", currentBufferSize.get());
             }
         } catch (Exception ex) {
-            logger.error("exception while checking callback list", ex);
+            LOGGER.error("exception while checking callback list", ex);
         }
     }
 
@@ -141,13 +141,13 @@ public class Sender {
             e.printStackTrace(pw);
             exceptStr = sw.toString();
         } catch (Exception ex) {
-            logger.error(getExceptionStack(ex));
+            LOGGER.error(getExceptionStack(ex));
         } finally {
             try {
                 pw.close();
                 sw.close();
             } catch (Exception ex) {
-                logger.error(getExceptionStack(ex));
+                LOGGER.error(getExceptionStack(ex));
             }
         }
         return exceptStr;
@@ -155,7 +155,7 @@ public class Sender {
 
     /*Used for asynchronously message sending.*/
     public void notifyCallback(Channel channel, String messageId, SendResult result) {
-        logger.debug("Channel = {} , ack messageId = {}", channel, messageId);
+        LOGGER.debug("Channel = {} , ack messageId = {}", channel, messageId);
         if (channel == null) {
             return;
         }
@@ -178,16 +178,13 @@ public class Sender {
         }
     }
 
-    private SendResult syncSendInternalMessage(NettyClient client,
-                                               EncodeObject encodeObject, String msgUUID,
-                                               long timeout, TimeUnit timeUnit)
-            throws ExecutionException, InterruptedException, TimeoutException {
-
+    private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null) {
             return SendResult.NO_CONNECTION;
         }
         if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-            logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+            LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                     encodeObject.getAttributes());
             return SendResult.INVALID_ATTRIBUTES;
         }
@@ -237,17 +234,17 @@ public class Sender {
             message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit);
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
-            logger.error("send message error {} ", getExceptionStack(e));
+            LOGGER.error("send message error {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.THREAD_INTERRUPT;
         } catch (ExecutionException e) {
             // TODO Auto-generated catch block
-            logger.error("ExecutionException {} ", getExceptionStack(e));
+            LOGGER.error("ExecutionException {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR;
         } catch (TimeoutException e) {
             // TODO Auto-generated catch block
-            logger.error("TimeoutException {} ", getExceptionStack(e));
+            LOGGER.error("TimeoutException {} ", getExceptionStack(e));
             //e.printStackTrace();
             SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
             if (syncMessageCallable != null) {
@@ -255,14 +252,14 @@ public class Sender {
                 if (tmpClient != null) {
                     Channel curChannel = tmpClient.getChannel();
                     if (curChannel != null) {
-                        logger.error("channel maybe busy {}", curChannel);
+                        LOGGER.error("channel maybe busy {}", curChannel);
                         scanThread.addTimeoutChannel(curChannel);
                     }
                 }
             }
             return SendResult.TIMEOUT;
         } catch (Throwable e) {
-            logger.error("syncSendMessage exception {} ", getExceptionStack(e));
+            LOGGER.error("syncSendMessage exception {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR;
         }
@@ -280,8 +277,7 @@ public class Sender {
     }
 
     private SendResult syncSendMessageIndexInternal(NettyClient client, EncodeObject encodeObject, String msgUUID,
-                                                    long timeout, TimeUnit timeUnit)
-            throws ExecutionException, InterruptedException, TimeoutException {
+            long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null || !client.isActive()) {
             chooseProxy.remove(encodeObject.getMessageId());
             client = clientMgr.getClientByRoundRobin();
@@ -336,7 +332,7 @@ public class Sender {
                 client = clientMgr.getContainProxy(proxyip);
             }
             if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-                logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+                LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                         encodeObject.getAttributes());
                 return SendResult.INVALID_ATTRIBUTES.toString();
             }
@@ -345,17 +341,17 @@ public class Sender {
                         msgUUID, timeout, timeUnit);
             } catch (InterruptedException e) {
                 // TODO Auto-generated catch block
-                logger.error("send message error {}", getExceptionStack(e));
+                LOGGER.error("send message error {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.THREAD_INTERRUPT.toString();
             } catch (ExecutionException e) {
                 // TODO Auto-generated catch block
-                logger.error("ExecutionException {}", getExceptionStack(e));
+                LOGGER.error("ExecutionException {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.UNKOWN_ERROR.toString();
             } catch (TimeoutException e) {
                 // TODO Auto-generated catch block
-                logger.error("TimeoutException {}", getExceptionStack(e));
+                LOGGER.error("TimeoutException {}", getExceptionStack(e));
                 //e.printStackTrace();
                 SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
                 if (syncMessageCallable != null) {
@@ -363,21 +359,21 @@ public class Sender {
                     if (tmpClient != null) {
                         Channel curChannel = tmpClient.getChannel();
                         if (curChannel != null) {
-                            logger.error("channel maybe busy {}", curChannel);
+                            LOGGER.error("channel maybe busy {}", curChannel);
                             scanThread.addTimeoutChannel(curChannel);
                         }
                     }
                 }
                 return SendResult.TIMEOUT.toString();
             } catch (Throwable e) {
-                logger.error("syncSendMessage exception {}", getExceptionStack(e));
+                LOGGER.error("syncSendMessage exception {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.UNKOWN_ERROR.toString();
             }
             scanThread.resetTimeoutChannel(client.getChannel());
             return message.toString() + "=" + client.getServerIP();
         } catch (Exception e) {
-            logger.error("agent send error {}", getExceptionStack(e));
+            LOGGER.error("agent send error {}", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR.toString();
         }
@@ -394,7 +390,7 @@ public class Sender {
      * @throws ProxysdkException
      */
     public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback callback, String msgUUID, long timeout,
-                                      TimeUnit timeUnit) throws ProxysdkException {
+            TimeUnit timeUnit) throws ProxysdkException {
         NettyClient client = chooseProxy.get(encodeObject.getMessageId());
         String proxyip = encodeObject.getProxyIp();
         if (proxyip != null && proxyip.length() != 0) {
@@ -510,7 +506,7 @@ public class Sender {
      * Following methods used by asynchronously message sending.
      */
     public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
                 encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
                 encodeObject.getDt(), encodeObject.getRealCnt());
@@ -525,7 +521,7 @@ public class Sender {
             throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
         }
         if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-            logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+            LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                     encodeObject.getAttributes());
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
@@ -554,7 +550,7 @@ public class Sender {
         QueueObject queueObject = msgQueueMap.putIfAbsent(encodeObject.getMessageId(),
                 new QueueObject(System.currentTimeMillis(), callback, size, timeout, timeUnit));
         if (queueObject != null) {
-            logger.warn("message id {} has existed.", encodeObject.getMessageId());
+            LOGGER.warn("message id {} has existed.", encodeObject.getMessageId());
         }
         if (encodeObject.getMsgtype() == 7) {
             int groupIdnum = 0;
@@ -584,17 +580,15 @@ public class Sender {
         String messageId = response.getMessageId();
         chooseProxy.remove(messageId);
         SyncMessageCallable callable = syncCallables.remove(messageId);
-        SendResult result = response.isException() ? SendResult.INVALID_ATTRIBUTES : SendResult.OK;
+        SendResult result = response.getSendResult();
         if (result == SendResult.OK) {
             metricWorker.recordSuccessByMessageId(messageId);
+        } else {
+            LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg());
         }
         if (callable != null) { // for syncSend
             callable.update(result);
         }
-        if (response.isException()) {
-            logger.error("{} exception happens, error message {}", channel,
-                    response.getExceptionError());
-        }
         notifyCallback(channel, messageId, result); // for asyncSend
     }
 
@@ -606,7 +600,7 @@ public class Sender {
         if (channel == null) {
             return;
         }
-        logger.info("channel {} connection is disconnected!", channel);
+        LOGGER.info("channel {} connection is disconnected!", channel);
         try {
             ConcurrentHashMap<String, QueueObject> msgQueueMap = callbacks.remove(channel);
             if (msgQueueMap != null) {
@@ -627,7 +621,7 @@ public class Sender {
                 msgQueueMap.clear();
             }
         } catch (Throwable e2) {
-            logger.info("process channel {} disconnected callbacks throw error,", channel, e2);
+            LOGGER.info("process channel {} disconnected callbacks throw error,", channel, e2);
         }
 
         try {
@@ -654,7 +648,7 @@ public class Sender {
                 }
             }
         } catch (Throwable e) {
-            logger.info("process channel {} disconnected syncCallables throw error,", channel, e);
+            LOGGER.info("process channel {} disconnected syncCallables throw error,", channel, e);
         }
     }
 
@@ -663,28 +657,28 @@ public class Sender {
         if (channel == null) {
             return;
         }
-        logger.info("wait for ack for channel {}", channel);
+        LOGGER.info("wait for ack for channel {}", channel);
         try {
             ConcurrentHashMap<String, QueueObject> queueObjMap = callbacks.get(channel);
             if (queueObjMap != null) {
                 while (true) {
                     if (queueObjMap.isEmpty()) {
-                        logger.info("this channel {} is empty!", channel);
+                        LOGGER.info("this channel {} is empty!", channel);
                         break;
                     }
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         // TODO Auto-generated catch block
-                        logger.error("wait for ack for channel {}, error {}",
+                        LOGGER.error("wait for ack for channel {}, error {}",
                                 channel, e.getMessage());
                         e.printStackTrace();
                     }
                 }
             }
-            logger.info("waitForAckForChannel finished , channel is {}", channel);
+            LOGGER.info("waitForAckForChannel finished , channel is {}", channel);
         } catch (Throwable e) {
-            logger.error("waitForAckForChannel exception, channel is {}", channel, e);
+            LOGGER.error("waitForAckForChannel exception, channel is {}", channel, e);
         }
     }