You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/04/19 02:10:14 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-73] remove duplicate codes & some minor updates (#56)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new f694bfd  [TUBEMQ-73] remove duplicate codes & some minor updates (#56)
f694bfd is described below

commit f694bfd61e0391263f5adb9c4c5823be0d9c6d35
Author: Tboy <gu...@immomo.com>
AuthorDate: Sun Apr 19 10:10:07 2020 +0800

    [TUBEMQ-73] remove duplicate codes & some minor updates (#56)
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 48 ++++++++--------------
 1 file changed, 18 insertions(+), 30 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 0ab7c39..170ea2e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -555,18 +555,18 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         .append(topicName).append(")!\"}");
                 return sb;
             } else {
-                List<String> transferedMessageList = new ArrayList<String>();
+                List<String> transferMessageList = new ArrayList<String>();
                 List<TransferedMessage> tmpMsgList = getMessageResult.transferedMessageList;
                 List<Message> messageList = DataConverterUtil.convertMessage(topicName, tmpMsgList);
                 int startPos = messageList.size() - msgCount < 0 ? 0 : messageList.size() - msgCount;
                 for (; startPos < messageList.size(); startPos++) {
                     String msgItem = new String(
                             Base64.encodeBase64(messageList.get(startPos).getData()));
-                    transferedMessageList.add(msgItem);
+                    transferMessageList.add(msgItem);
                 }
                 int i = 0;
                 sb.append("{\"result\":true,\"errCode\":200,\"errMsg\":\"Success!\",\"dataSet\":[");
-                for (String msgData : transferedMessageList) {
+                for (String msgData : transferMessageList) {
                     if (i > 0) {
                         sb.append(",");
                     }
@@ -651,15 +651,13 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             return builder.build();
         }
         int checkSum = CheckSum.crc32(msgData);
-        if (request.getCheckSum() != -1) {
-            if (checkSum != request.getCheckSum()) {
-                builder.setErrCode(TErrCodeConstants.FORBIDDEN);
-                builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
-                        .append(request.getCheckSum()).append(" of ").append(reqTopic)
-                        .append(" not equal to the data's checksum of ")
-                        .append(checkSum).toString());
-                return builder.build();
-            }
+        if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
+            builder.setErrCode(TErrCodeConstants.FORBIDDEN);
+            builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
+                    .append(request.getCheckSum()).append(" of ").append(reqTopic)
+                    .append(" not equal to the data's checksum of ")
+                    .append(checkSum).toString());
+            return builder.build();
         }
         CertifiedResult authorizeResult =
                 serverAuthHandler.validProduceAuthorizeInfo(
@@ -693,23 +691,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         .append(", server receive message overflow!").toString());
                 return builder.build();
             }
-        } catch (final Exception e) {
-            logger.error("Put message failed ", e);
+        } catch (final Throwable ex) {
+            logger.error("Put message failed ", ex);
             strBuffer.delete(0, strBuffer.length());
             builder.setSuccess(false);
             builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
             builder.setErrMsg(strBuffer.append("Put message failed from ")
                     .append(tubeConfig.getHostName()).append(" ")
-                    .append((e.getMessage() != null ? e.getMessage() : " ")).toString());
-            return builder.build();
-        } catch (final Throwable ee) {
-            logger.error("Put message failed2 ", ee);
-            strBuffer.delete(0, strBuffer.length());
-            builder.setSuccess(false);
-            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
-            builder.setErrMsg(strBuffer.append("Put message failed2 from ")
-                    .append(tubeConfig.getHostName()).append(" ")
-                    .append((ee.getMessage() != null ? ee.getMessage() : " ")).toString());
+                    .append((ex.getMessage() != null ? ex.getMessage() : " ")).toString());
             return builder.build();
         }
     }
@@ -886,10 +875,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             logger.info(strBuffer.append("[Consumer Register]").append(clientId)
                     .append(TokenConstants.SEGMENT_SEP).append(partStr)
                     .append(TokenConstants.SEGMENT_SEP).append(offsetInfo)
-                    .append(", requestOffset=").append(reqOffset)
-                    .append(", req has SSD storeId=").append(request.hasSsdStoreId())
-                    .append(", req qryPriorityId=").append(reqQryPriorityId)
-                    .append(", cur SSD storeId=").append(reqSsdStoreId)
+                    .append(", reqOffset=").append(reqOffset)
+                    .append(", reqSsdStoreId=").append(reqSsdStoreId)
+                    .append(", reqQryPriorityId=").append(reqQryPriorityId)
                     .append(", isOverTLS=").append(overtls).toString());
             builder.setSuccess(true);
             builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -1163,7 +1151,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             builder.setErrCode(TErrCodeConstants.UNAUTHORIZED);
             builder.setErrMsg("The partition not registered by consumers");
             logger.error(strBuffer
-                    .append("[consumerCommitC2B error] partition not registered by consumers: commit cosnumer is: ")
+                    .append("[consumerCommitC2B error] partition not registered by consumers: commit consumer is: ")
                     .append(clientId).append(", partition is : ").append(partStr).toString());
             return builder.build();
         }
@@ -1192,7 +1180,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             strBuffer.delete(0, strBuffer.length());
             logger.error(strBuffer
                     .append("[consumerCommitC2B error] " +
-                            "partition has been registered by other consumer: commit cosnumer is: ")
+                            "partition has been registered by other consumer: commit consumer is: ")
                     .append(clientId).append(", registered consumer is: ").append(consumerNodeInfo.getConsumerId())
                     .append(", partition is : ").append(partStr).toString());
         }