You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:45 UTC
[inlong] 03/07: [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1e3212cdb7fe27bdf3140766f3834b80028dacde
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 17:06:39 2022 +0800
[INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)
---
.../inlong/common/msg/AttributeConstants.java | 5 +
.../apache/inlong/sdk/dataproxy/SendResult.java | 11 +-
.../inlong/sdk/dataproxy/codec/EncodeObject.java | 154 ++++++++++++++-------
.../inlong/sdk/dataproxy/codec/ErrorCode.java | 52 -------
.../sdk/dataproxy/codec/ProtocolDecoder.java | 38 ++---
.../inlong/sdk/dataproxy/network/Sender.java | 92 ++++++------
6 files changed, 177 insertions(+), 175 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 645de6d4f..6cd635ea7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -89,4 +89,9 @@ public interface AttributeConstants {
// error message, add by receiver
String MESSAGE_PROCESS_ERRMSG = "errMsg";
+
+ String MESSAGE_ID = "messageId";
+
+ // dataproxy IP from dp response ack
+ String MESSAGE_DP_IP = "dpIP";
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
index 10c89af38..75f057b8f 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
@@ -19,14 +19,19 @@
package org.apache.inlong.sdk.dataproxy;
public enum SendResult {
- INVALID_ATTRIBUTES,
+ INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
OK,
TIMEOUT,
CONNECTION_BREAK,
THREAD_INTERRUPT,
ASYNC_CALLBACK_BUFFER_FULL,
NO_CONNECTION,
- INVALID_DATA,
- UNKOWN_ERROR
+ INVALID_DATA, // including DataProxyErrCode(103, 111)
+ BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104)
+ SINK_SERVICE_UNREADY, // DataProxyErrCode(1)
+ UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113)
+ TOPIC_IS_BLANK, // DataProxyErrCode(115)
+ DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120)
+ UNKOWN_ERROR
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index bf50507e2..8634621cf 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -18,12 +18,21 @@
package org.apache.inlong.sdk.dataproxy.codec;
-import java.util.List;
-
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class EncodeObject {
- private static final String MESSAGE_ID_PREFIX = "messageId=";
+
+ private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults()
+ .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
private byte[] bodyBytes;
private String attributes;
@@ -54,25 +63,24 @@ public class EncodeObject {
private String msgUUID = null;
private EncryptConfigEntry encryptEntry = null;
- private boolean isException = false;
- private ErrorCode exceptionError = null;
+ private SendResult sendResult = SendResult.OK;
+ private String errMsg;
+ private String dpIp;
- /* Used by de_serialization. msgtype=7/8*/
+ /* Used by de_serialization. msgtype=8*/
public EncodeObject() {
}
+ /* Used by de_serialization. msgtype=7*/
+ public EncodeObject(String attributes) {
+ handleAttr(attributes);
+ }
+
/* Used by de_serialization. */
public EncodeObject(byte[] bodyBytes, String attributes) {
this.bodyBytes = bodyBytes;
this.attributes = attributes;
- this.messageId = "";
- String[] tokens = attributes.split("&");
- for (int i = 0; i < tokens.length; i++) {
- if (tokens[i].startsWith("messageId=")) {
- this.messageId = tokens[i].substring(MESSAGE_ID_PREFIX.length(), tokens[i].length());
- break;
- }
- }
+ handleAttr(attributes);
}
/* Used by serialization.But never used */
@@ -85,7 +93,7 @@ public class EncodeObject {
// used for bytes initializtion,msgtype=3/5
public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
- int msgtype, boolean isCompress, final String groupId) {
+ int msgtype, boolean isCompress, final String groupId) {
this.bodyBytes = bodyBytes;
this.messageId = messageId;
this.attributes = attributes + "&messageId=" + messageId;
@@ -96,7 +104,7 @@ public class EncodeObject {
// used for bodylist initializtion,msgtype=3/5
public EncodeObject(List<byte[]> bodyList, String attributes, String messageId,
- int msgtype, boolean isCompress, final String groupId) {
+ int msgtype, boolean isCompress, final String groupId) {
this.bodylist = bodyList;
this.messageId = messageId;
this.attributes = attributes + "&messageId=" + messageId;
@@ -107,8 +115,8 @@ public class EncodeObject {
// used for bytes initializtion,msgtype=7/8
public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport,
- boolean isGroupIdTransfer, long dt, long seqId, String groupId,
- String streamId, String commonattr) {
+ boolean isGroupIdTransfer, long dt, long seqId, String groupId,
+ String streamId, String commonattr) {
this.bodyBytes = bodyBytes;
this.msgtype = msgtype;
this.isCompress = isCompress;
@@ -123,8 +131,8 @@ public class EncodeObject {
// used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr) {
this.bodylist = bodyList;
this.msgtype = msgtype;
this.isCompress = isCompress;
@@ -139,9 +147,9 @@ public class EncodeObject {
// file agent, used for bytes initializtion,msgtype=7/8
public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr,
- String messageKey, String proxyIp) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr,
+ String messageKey, String proxyIp) {
this.bodyBytes = bodyBytes;
this.msgtype = msgtype;
this.isCompress = isCompress;
@@ -158,9 +166,9 @@ public class EncodeObject {
// file agent, used for bodylist initializtion,msgtype=7/8
public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr,
- String messageKey, String proxyIp) {
+ boolean isReport, boolean isGroupIdTransfer, long dt,
+ long seqId, String groupId, String streamId, String commonattr,
+ String messageKey, String proxyIp) {
this.bodylist = bodyList;
this.msgtype = msgtype;
this.isCompress = isCompress;
@@ -175,6 +183,60 @@ public class EncodeObject {
this.proxyIp = proxyIp;
}
+ private void handleAttr(String attributes) {
+ if (StringUtils.isBlank(attributes)) {
+ return;
+ }
+ Map<String, String> backAttrs = new HashMap<>(MAP_SPLITTER.split(attributes));
+ if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) {
+ this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID);
+ }
+ dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP);
+
+ String errCode = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+ // errCode is empty or equals 0 -> success
+ if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
+ this.sendResult = SendResult.OK;
+ } else {
+ // get errMsg
+ this.errMsg = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+ if (StringUtils.isBlank(errMsg)) {
+ this.errMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
+ }
+ //sendResult
+ this.sendResult = convertToSendResult(Integer.parseInt(errCode));
+ }
+ }
+
+ private SendResult convertToSendResult(int errCode) {
+ DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
+ switch (dpErrCode) {
+ case SINK_SERVICE_UNREADY:
+ return SendResult.SINK_SERVICE_UNREADY;
+ case MISS_REQUIRED_GROUPID_ARGUMENT:
+ case MISS_REQUIRED_STREAMID_ARGUMENT:
+ case MISS_REQUIRED_DT_ARGUMENT:
+ case UNSUPPORTED_EXTEND_FIELD_VALUE:
+ return SendResult.INVALID_ATTRIBUTES;
+ case MISS_REQUIRED_BODY_ARGUMENT:
+ case EMPTY_MSG:
+ return SendResult.INVALID_DATA;
+ case BODY_EXCEED_MAX_LEN:
+ return SendResult.BODY_EXCEED_MAX_LEN;
+ case UNCONFIGURED_GROUPID_OR_STREAMID:
+ return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID;
+ case PUT_EVENT_TO_CHANNEL_FAILURE:
+ case NO_AVAILABLE_PRODUCER:
+ case PRODUCER_IS_NULL:
+ case SEND_REQUEST_TO_MQ_FAILURE:
+ case MQ_RETURN_ERROR:
+ case DUPLICATED_MESSAGE:
+ return SendResult.DATAPROXY_FAIL_TO_RECEIVE;
+ default:
+ return SendResult.UNKOWN_ERROR;
+ }
+ }
+
public String getMsgUUID() {
return msgUUID;
}
@@ -215,14 +277,6 @@ public class EncodeObject {
this.streamId = streamId;
}
- public void setMsgtype(int msgtype) {
- this.msgtype = msgtype;
- }
-
- public void setBodyBytes(byte[] bodyBytes) {
- this.bodyBytes = bodyBytes;
- }
-
public boolean isReport() {
return isReport;
}
@@ -317,10 +371,18 @@ public class EncodeObject {
return msgtype;
}
+ public void setMsgtype(int msgtype) {
+ this.msgtype = msgtype;
+ }
+
public byte[] getBodyBytes() {
return bodyBytes;
}
+ public void setBodyBytes(byte[] bodyBytes) {
+ this.bodyBytes = bodyBytes;
+ }
+
public String getAttributes() {
return attributes;
}
@@ -361,6 +423,10 @@ public class EncodeObject {
return cnt;
}
+ public void setCnt(int cnt) {
+ this.cnt = cnt;
+ }
+
public int getRealCnt() {
if (bodylist != null) {
return bodylist.size();
@@ -368,23 +434,15 @@ public class EncodeObject {
return 1;
}
- public void setCnt(int cnt) {
- this.cnt = cnt;
- }
-
- public boolean isException() {
- return isException;
- }
-
- public void setException(boolean exception) {
- isException = exception;
+ public String getDpIp() {
+ return dpIp;
}
- public ErrorCode getExceptionError() {
- return exceptionError;
+ public String getErrMsg() {
+ return errMsg;
}
- public void setExceptionError(ErrorCode exceptionError) {
- this.exceptionError = exceptionError;
+ public SendResult getSendResult() {
+ return sendResult;
}
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
deleted file mode 100644
index 3598860a4..000000000
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.codec;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public enum ErrorCode {
-
- ATTR_ERROR(1),
-
- DT_ERROR(2),
-
- COMPRESS_ERROR(3),
-
- OTHER_ERROR(4),
-
- LONG_LENGTH_ERROR(5);
- private final int value;
- private static final Map<Integer, ErrorCode> map = new HashMap<>();
-
- static {
- for (ErrorCode errorCode : ErrorCode.values()) {
- map.put(errorCode.value, errorCode);
- }
- }
-
- ErrorCode(int value) {
- this.value = value;
- }
-
- public static ErrorCode valueOf(int value) {
- return map.get(value);
- }
-
-}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 579b07b1e..7bf9bd883 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -21,15 +21,15 @@ package org.apache.inlong.sdk.dataproxy.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
-import java.nio.charset.StandardCharsets;
-
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
- private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx,
@@ -37,9 +37,9 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
buffer.markReaderIndex();
// totallen
int totalLen = buffer.readInt();
- logger.debug("decode totalLen : {}", totalLen);
+ LOGGER.debug("decode totalLen : {}", totalLen);
if (totalLen != buffer.readableBytes()) {
- logger.error("totalLen is not equal readableBytes.total:" + totalLen
+ LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen
+ ";readableBytes:" + buffer.readableBytes());
buffer.resetReaderIndex();
throw new Exception("totalLen is not equal readableBytes.total");
@@ -48,13 +48,13 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
int msgType = buffer.readByte() & 0x1f;
if (msgType == 4) {
- logger.info("debug decode");
+ LOGGER.info("debug decode");
}
if (msgType == 3 | msgType == 5) {
// bodylen
int bodyLength = buffer.readInt();
if (bodyLength >= totalLen) {
- logger.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+ LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+ ";bodyLen:" + bodyLength);
buffer.resetReaderIndex();
throw new Exception("bodyLen is greater than totalLen.totalLen");
@@ -72,28 +72,20 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
attrBytes = new byte[attrLength];
buffer.readBytes(attrBytes);
}
- EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes,
- StandardCharsets.UTF_8));
+ EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, StandardCharsets.UTF_8));
object.setMsgtype(5);
out.add(object);
} else if (msgType == 7) {
int seqId = buffer.readInt();
int attrLen = buffer.readShort();
- EncodeObject object = new EncodeObject();
- object.setMessageId(String.valueOf(seqId));
-
- if (attrLen == 4) {
- int errorValue = buffer.readInt();
- ErrorCode errorCode = ErrorCode.valueOf(errorValue);
- if (errorCode != null) {
- object.setException(true);
- object.setExceptionError(errorCode);
- }
- } else {
- byte[] attrContent = new byte[attrLen];
- buffer.readBytes(attrContent);
+ byte[] attrBytes = null;
+ if (attrLen > 0) {
+ attrBytes = new byte[attrLen];
+ buffer.readBytes(attrBytes);
}
+ EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
+ object.setMessageId(String.valueOf(seqId));
buffer.readShort();
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 49be1ce54..f1eef9978 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,10 +22,10 @@ import io.netty.channel.Channel;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.FileCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class Sender {
- private static final Logger logger = LoggerFactory.getLogger(Sender.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
/* Store the callback used by asynchronously message sending. */
private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks =
@@ -104,12 +104,12 @@ public class Sender {
metricWorker = new MetricWorkerThread(configure, this);
metricWorker.start();
- logger.info("proxy sdk is starting!");
+ LOGGER.info("proxy sdk is starting!");
}
private void checkCallbackList() {
// max wait for 1 min
- logger.info("checking call back list before close, current size is {}",
+ LOGGER.info("checking call back list before close, current size is {}",
currentBufferSize.get());
int count = 0;
try {
@@ -118,10 +118,10 @@ public class Sender {
count += 1;
}
if (currentBufferSize.get() > 0) {
- logger.warn("callback not empty {}, please check it", currentBufferSize.get());
+ LOGGER.warn("callback not empty {}, please check it", currentBufferSize.get());
}
} catch (Exception ex) {
- logger.error("exception while checking callback list", ex);
+ LOGGER.error("exception while checking callback list", ex);
}
}
@@ -141,13 +141,13 @@ public class Sender {
e.printStackTrace(pw);
exceptStr = sw.toString();
} catch (Exception ex) {
- logger.error(getExceptionStack(ex));
+ LOGGER.error(getExceptionStack(ex));
} finally {
try {
pw.close();
sw.close();
} catch (Exception ex) {
- logger.error(getExceptionStack(ex));
+ LOGGER.error(getExceptionStack(ex));
}
}
return exceptStr;
@@ -155,7 +155,7 @@ public class Sender {
/*Used for asynchronously message sending.*/
public void notifyCallback(Channel channel, String messageId, SendResult result) {
- logger.debug("Channel = {} , ack messageId = {}", channel, messageId);
+ LOGGER.debug("Channel = {} , ack messageId = {}", channel, messageId);
if (channel == null) {
return;
}
@@ -178,16 +178,13 @@ public class Sender {
}
}
- private SendResult syncSendInternalMessage(NettyClient client,
- EncodeObject encodeObject, String msgUUID,
- long timeout, TimeUnit timeUnit)
- throws ExecutionException, InterruptedException, TimeoutException {
-
+ private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID,
+ long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
if (client == null) {
return SendResult.NO_CONNECTION;
}
if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
- logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+ LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
encodeObject.getAttributes());
return SendResult.INVALID_ATTRIBUTES;
}
@@ -237,17 +234,17 @@ public class Sender {
message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
- logger.error("send message error {} ", getExceptionStack(e));
+ LOGGER.error("send message error {} ", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.THREAD_INTERRUPT;
} catch (ExecutionException e) {
// TODO Auto-generated catch block
- logger.error("ExecutionException {} ", getExceptionStack(e));
+ LOGGER.error("ExecutionException {} ", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR;
} catch (TimeoutException e) {
// TODO Auto-generated catch block
- logger.error("TimeoutException {} ", getExceptionStack(e));
+ LOGGER.error("TimeoutException {} ", getExceptionStack(e));
//e.printStackTrace();
SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
if (syncMessageCallable != null) {
@@ -255,14 +252,14 @@ public class Sender {
if (tmpClient != null) {
Channel curChannel = tmpClient.getChannel();
if (curChannel != null) {
- logger.error("channel maybe busy {}", curChannel);
+ LOGGER.error("channel maybe busy {}", curChannel);
scanThread.addTimeoutChannel(curChannel);
}
}
}
return SendResult.TIMEOUT;
} catch (Throwable e) {
- logger.error("syncSendMessage exception {} ", getExceptionStack(e));
+ LOGGER.error("syncSendMessage exception {} ", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR;
}
@@ -280,8 +277,7 @@ public class Sender {
}
private SendResult syncSendMessageIndexInternal(NettyClient client, EncodeObject encodeObject, String msgUUID,
- long timeout, TimeUnit timeUnit)
- throws ExecutionException, InterruptedException, TimeoutException {
+ long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
if (client == null || !client.isActive()) {
chooseProxy.remove(encodeObject.getMessageId());
client = clientMgr.getClientByRoundRobin();
@@ -336,7 +332,7 @@ public class Sender {
client = clientMgr.getContainProxy(proxyip);
}
if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
- logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+ LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
encodeObject.getAttributes());
return SendResult.INVALID_ATTRIBUTES.toString();
}
@@ -345,17 +341,17 @@ public class Sender {
msgUUID, timeout, timeUnit);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
- logger.error("send message error {}", getExceptionStack(e));
+ LOGGER.error("send message error {}", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.THREAD_INTERRUPT.toString();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
- logger.error("ExecutionException {}", getExceptionStack(e));
+ LOGGER.error("ExecutionException {}", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR.toString();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
- logger.error("TimeoutException {}", getExceptionStack(e));
+ LOGGER.error("TimeoutException {}", getExceptionStack(e));
//e.printStackTrace();
SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
if (syncMessageCallable != null) {
@@ -363,21 +359,21 @@ public class Sender {
if (tmpClient != null) {
Channel curChannel = tmpClient.getChannel();
if (curChannel != null) {
- logger.error("channel maybe busy {}", curChannel);
+ LOGGER.error("channel maybe busy {}", curChannel);
scanThread.addTimeoutChannel(curChannel);
}
}
}
return SendResult.TIMEOUT.toString();
} catch (Throwable e) {
- logger.error("syncSendMessage exception {}", getExceptionStack(e));
+ LOGGER.error("syncSendMessage exception {}", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR.toString();
}
scanThread.resetTimeoutChannel(client.getChannel());
return message.toString() + "=" + client.getServerIP();
} catch (Exception e) {
- logger.error("agent send error {}", getExceptionStack(e));
+ LOGGER.error("agent send error {}", getExceptionStack(e));
syncCallables.remove(encodeObject.getMessageId());
return SendResult.UNKOWN_ERROR.toString();
}
@@ -394,7 +390,7 @@ public class Sender {
* @throws ProxysdkException
*/
public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback callback, String msgUUID, long timeout,
- TimeUnit timeUnit) throws ProxysdkException {
+ TimeUnit timeUnit) throws ProxysdkException {
NettyClient client = chooseProxy.get(encodeObject.getMessageId());
String proxyip = encodeObject.getProxyIp();
if (proxyip != null && proxyip.length() != 0) {
@@ -510,7 +506,7 @@ public class Sender {
* Following methods used by asynchronously message sending.
*/
public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
@@ -525,7 +521,7 @@ public class Sender {
throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
}
if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
- logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+ LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
encodeObject.getAttributes());
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
@@ -554,7 +550,7 @@ public class Sender {
QueueObject queueObject = msgQueueMap.putIfAbsent(encodeObject.getMessageId(),
new QueueObject(System.currentTimeMillis(), callback, size, timeout, timeUnit));
if (queueObject != null) {
- logger.warn("message id {} has existed.", encodeObject.getMessageId());
+ LOGGER.warn("message id {} has existed.", encodeObject.getMessageId());
}
if (encodeObject.getMsgtype() == 7) {
int groupIdnum = 0;
@@ -584,17 +580,15 @@ public class Sender {
String messageId = response.getMessageId();
chooseProxy.remove(messageId);
SyncMessageCallable callable = syncCallables.remove(messageId);
- SendResult result = response.isException() ? SendResult.INVALID_ATTRIBUTES : SendResult.OK;
+ SendResult result = response.getSendResult();
if (result == SendResult.OK) {
metricWorker.recordSuccessByMessageId(messageId);
+ } else {
+ LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg());
}
if (callable != null) { // for syncSend
callable.update(result);
}
- if (response.isException()) {
- logger.error("{} exception happens, error message {}", channel,
- response.getExceptionError());
- }
notifyCallback(channel, messageId, result); // for asyncSend
}
@@ -606,7 +600,7 @@ public class Sender {
if (channel == null) {
return;
}
- logger.info("channel {} connection is disconnected!", channel);
+ LOGGER.info("channel {} connection is disconnected!", channel);
try {
ConcurrentHashMap<String, QueueObject> msgQueueMap = callbacks.remove(channel);
if (msgQueueMap != null) {
@@ -627,7 +621,7 @@ public class Sender {
msgQueueMap.clear();
}
} catch (Throwable e2) {
- logger.info("process channel {} disconnected callbacks throw error,", channel, e2);
+ LOGGER.info("process channel {} disconnected callbacks throw error,", channel, e2);
}
try {
@@ -654,7 +648,7 @@ public class Sender {
}
}
} catch (Throwable e) {
- logger.info("process channel {} disconnected syncCallables throw error,", channel, e);
+ LOGGER.info("process channel {} disconnected syncCallables throw error,", channel, e);
}
}
@@ -663,28 +657,28 @@ public class Sender {
if (channel == null) {
return;
}
- logger.info("wait for ack for channel {}", channel);
+ LOGGER.info("wait for ack for channel {}", channel);
try {
ConcurrentHashMap<String, QueueObject> queueObjMap = callbacks.get(channel);
if (queueObjMap != null) {
while (true) {
if (queueObjMap.isEmpty()) {
- logger.info("this channel {} is empty!", channel);
+ LOGGER.info("this channel {} is empty!", channel);
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
- logger.error("wait for ack for channel {}, error {}",
+ LOGGER.error("wait for ack for channel {}, error {}",
channel, e.getMessage());
e.printStackTrace();
}
}
}
- logger.info("waitForAckForChannel finished , channel is {}", channel);
+ LOGGER.info("waitForAckForChannel finished , channel is {}", channel);
} catch (Throwable e) {
- logger.error("waitForAckForChannel exception, channel is {}", channel, e);
+ LOGGER.error("waitForAckForChannel exception, channel is {}", channel, e);
}
}