You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/04/19 02:10:14 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-73] remove
duplicate codes & some minor updates (#56)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new f694bfd [TUBEMQ-73] remove duplicate codes & some minor updates (#56)
f694bfd is described below
commit f694bfd61e0391263f5adb9c4c5823be0d9c6d35
Author: Tboy <gu...@immomo.com>
AuthorDate: Sun Apr 19 10:10:07 2020 +0800
[TUBEMQ-73] remove duplicate codes & some minor updates (#56)
---
.../tubemq/server/broker/BrokerServiceServer.java | 48 ++++++++--------------
1 file changed, 18 insertions(+), 30 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 0ab7c39..170ea2e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -555,18 +555,18 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
.append(topicName).append(")!\"}");
return sb;
} else {
- List<String> transferedMessageList = new ArrayList<String>();
+ List<String> transferMessageList = new ArrayList<String>();
List<TransferedMessage> tmpMsgList = getMessageResult.transferedMessageList;
List<Message> messageList = DataConverterUtil.convertMessage(topicName, tmpMsgList);
int startPos = messageList.size() - msgCount < 0 ? 0 : messageList.size() - msgCount;
for (; startPos < messageList.size(); startPos++) {
String msgItem = new String(
Base64.encodeBase64(messageList.get(startPos).getData()));
- transferedMessageList.add(msgItem);
+ transferMessageList.add(msgItem);
}
int i = 0;
sb.append("{\"result\":true,\"errCode\":200,\"errMsg\":\"Success!\",\"dataSet\":[");
- for (String msgData : transferedMessageList) {
+ for (String msgData : transferMessageList) {
if (i > 0) {
sb.append(",");
}
@@ -651,15 +651,13 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
return builder.build();
}
int checkSum = CheckSum.crc32(msgData);
- if (request.getCheckSum() != -1) {
- if (checkSum != request.getCheckSum()) {
- builder.setErrCode(TErrCodeConstants.FORBIDDEN);
- builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
- .append(request.getCheckSum()).append(" of ").append(reqTopic)
- .append(" not equal to the data's checksum of ")
- .append(checkSum).toString());
- return builder.build();
- }
+ if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
+ builder.setErrCode(TErrCodeConstants.FORBIDDEN);
+ builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
+ .append(request.getCheckSum()).append(" of ").append(reqTopic)
+ .append(" not equal to the data's checksum of ")
+ .append(checkSum).toString());
+ return builder.build();
}
CertifiedResult authorizeResult =
serverAuthHandler.validProduceAuthorizeInfo(
@@ -693,23 +691,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
.append(", server receive message overflow!").toString());
return builder.build();
}
- } catch (final Exception e) {
- logger.error("Put message failed ", e);
+ } catch (final Throwable ex) {
+ logger.error("Put message failed ", ex);
strBuffer.delete(0, strBuffer.length());
builder.setSuccess(false);
builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
builder.setErrMsg(strBuffer.append("Put message failed from ")
.append(tubeConfig.getHostName()).append(" ")
- .append((e.getMessage() != null ? e.getMessage() : " ")).toString());
- return builder.build();
- } catch (final Throwable ee) {
- logger.error("Put message failed2 ", ee);
- strBuffer.delete(0, strBuffer.length());
- builder.setSuccess(false);
- builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
- builder.setErrMsg(strBuffer.append("Put message failed2 from ")
- .append(tubeConfig.getHostName()).append(" ")
- .append((ee.getMessage() != null ? ee.getMessage() : " ")).toString());
+ .append((ex.getMessage() != null ? ex.getMessage() : " ")).toString());
return builder.build();
}
}
@@ -886,10 +875,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
logger.info(strBuffer.append("[Consumer Register]").append(clientId)
.append(TokenConstants.SEGMENT_SEP).append(partStr)
.append(TokenConstants.SEGMENT_SEP).append(offsetInfo)
- .append(", requestOffset=").append(reqOffset)
- .append(", req has SSD storeId=").append(request.hasSsdStoreId())
- .append(", req qryPriorityId=").append(reqQryPriorityId)
- .append(", cur SSD storeId=").append(reqSsdStoreId)
+ .append(", reqOffset=").append(reqOffset)
+ .append(", reqSsdStoreId=").append(reqSsdStoreId)
+ .append(", reqQryPriorityId=").append(reqQryPriorityId)
.append(", isOverTLS=").append(overtls).toString());
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1163,7 +1151,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
builder.setErrCode(TErrCodeConstants.UNAUTHORIZED);
builder.setErrMsg("The partition not registered by consumers");
logger.error(strBuffer
- .append("[consumerCommitC2B error] partition not registered by consumers: commit cosnumer is: ")
+ .append("[consumerCommitC2B error] partition not registered by consumers: commit consumer is: ")
.append(clientId).append(", partition is : ").append(partStr).toString());
return builder.build();
}
@@ -1192,7 +1180,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
strBuffer.delete(0, strBuffer.length());
logger.error(strBuffer
.append("[consumerCommitC2B error] " +
- "partition has been registered by other consumer: commit cosnumer is: ")
+ "partition has been registered by other consumer: commit consumer is: ")
.append(clientId).append(", registered consumer is: ").append(consumerNodeInfo.getConsumerId())
.append(", partition is : ").append(partStr).toString());
}