You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/16 09:04:35 UTC

[incubator-tubemq] 01/01: Revert "[TUBEMQ-356]C++ SDK Codec decode add requestid (#268)"

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch revert-268-TUBEMQ-356
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit afe8ca5536baca635e9a2059d6d86ac2b723c377
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Sep 16 09:04:27 2020 +0000

    Revert "[TUBEMQ-356]C++ SDK Codec decode add requestid (#268)"
    
    This reverts commit 60950a6e2e15862e4c37e927ddba597fe0ced8bd.
---
 .../tubemq-client-cpp/src/client_connection.cc     |  9 +++-
 .../tubemq-client-cpp/src/codec_protocol.h         |  2 +-
 .../tubemq-client-cpp/src/tubemq_codec.h           | 53 ++++++++++++++--------
 3 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
index 08288a0..e6ae503 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -168,14 +168,21 @@ void ClientConnection::asyncRead() {
         recv_buffer_->WriteBytes(len);
         std::error_code error;
         size_t availsize = socket_->available(error);
+        LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, availsize:%ld, recvbuffer:%s",
+                  ToString().c_str(), len, package_length_, availsize,
+                  recv_buffer_->String().c_str());
         if (availsize > 0 && !error) {
           recv_buffer_->EnsureWritableBytes(availsize);
           size_t rlen = socket_->receive(asio::buffer(recv_buffer_->WriteBegin(), availsize));
           if (rlen > 0) {
             recv_buffer_->WriteBytes(rlen);
           }
+          LOG_TRACE("[%s]syncread done, receivelen:%ld, recvbuffer:%s", ToString().c_str(), rlen,
+                    recv_buffer_->String().c_str());
         }
         while (checkPackageDone() > 0 && recv_buffer_->length() > 0) {
+          LOG_TRACE("[%s]recheck packagedone package_length_:%ld, recvbuffer:%s",
+                    ToString().c_str(), package_length_, recv_buffer_->String().c_str());
         }
         asyncRead();
       });
@@ -241,7 +248,7 @@ void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any*
     ResponseContext rsp;
     BufferPtr* buff = any_cast<BufferPtr>(check_out);
     if (buff != nullptr) {
-      req->req_->codec_->Decode(*buff, request_id, rsp.rsp_);
+      req->req_->codec_->Decode(*buff, rsp.rsp_);
     } else {
       rsp.rsp_ = *check_out;
     }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h b/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
index 953ceaa..82e68e7 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
@@ -37,7 +37,7 @@ class CodecProtocol {
 
   virtual std::string Name() const = 0;
 
-  virtual bool Decode(const BufferPtr &buff, uint32_t request_id, Any &out) = 0;
+  virtual bool Decode(const BufferPtr &buff, Any &out) = 0;
 
   virtual bool Encode(const Any &in, BufferPtr &buff) = 0;
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
index b3996df..c186701 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
@@ -72,10 +72,12 @@ class TubeMQCodec final : public CodecProtocol {
 
   virtual std::string Name() const { return "tubemq_v1"; }
 
-  virtual bool Decode(const BufferPtr &buff, uint32_t request_id, Any &out) {
+  virtual bool Decode(const BufferPtr &buff, Any &out) {
     // check total length
+    LOG_TRACE("Decode: full message Decode come, begin decode");
     int32_t total_len = buff->length();
     if (total_len <= 0) {
+      LOG_TRACE("Decode: total_len <= 0, out, total_len = %d", total_len);
       return false;
     }
     // check package is valid
@@ -86,10 +88,12 @@ class TubeMQCodec final : public CodecProtocol {
     google::protobuf::io::ArrayInputStream rawOutput(buff->data(), total_len);
     bool result = readDelimitedFrom(&rawOutput, &rpc_header);
     if (!result) {
+      LOG_TRACE("Decode: parse RpcConnHeader failure, out");
       return result;
     }
     result = readDelimitedFrom(&rawOutput, &rsp_header);
     if (!result) {
+      LOG_TRACE("Decode: parse ResponseHeader failure, out");
       return result;
     }
     ResponseHeader_Status rspStatus = rsp_header.status();
@@ -100,6 +104,7 @@ class TubeMQCodec final : public CodecProtocol {
       rsp_protocol->error_msg_ = "OK";
       result = readDelimitedFrom(&rawOutput, &response_body);
       if (!result) {
+        LOG_TRACE("Decode: parse RspResponseBody failure, out");
         return false;
       }
       rsp_protocol->method_ = response_body.method();
@@ -109,24 +114,27 @@ class TubeMQCodec final : public CodecProtocol {
       rsp_protocol->success_ = false;
       result = readDelimitedFrom(&rawOutput, &rpc_exception);
       if (!result) {
+        LOG_TRACE("Decode: parse RspExceptionBody failure, out");
         return false;
       }
-      string err_info = rpc_exception.exceptionname();
-      err_info += delimiter::kDelimiterPound;
-      err_info += rpc_exception.stacktrace();
+      string errInfo = rpc_exception.exceptionname();
+      errInfo += delimiter::kDelimiterPound;
+      errInfo += rpc_exception.stacktrace();
       rsp_protocol->code_ = err_code::kErrRcvThrowError;
-      rsp_protocol->error_msg_ = err_info;
+      rsp_protocol->error_msg_ = errInfo;
     }
-    rsp_protocol->serial_no_ = request_id;
     out = Any(rsp_protocol);
-    LOG_TRACE("Decode: decode message finished, success_=%d, request_id=%d",
-      rsp_protocol->success_, request_id);
+    LOG_TRACE("Decode: decode message success, finished");
     return true;
   }
 
   virtual bool Encode(const Any &in, BufferPtr &buff) {
     RequestBody req_body;
     ReqProtocolPtr req_protocol = any_cast<ReqProtocolPtr>(in);
+
+    LOG_TRACE("Encode: begin encode message, request_id=%d, method_id=%d",
+              req_protocol->request_id_, req_protocol->method_id_);
+
     req_body.set_method(req_protocol->method_id_);
     req_body.set_timeout(req_protocol->rpc_read_timeout_);
     req_body.set_request(req_protocol->prot_msg_);
@@ -164,54 +172,63 @@ class TubeMQCodec final : public CodecProtocol {
       if (slice_len > rpc_config::kRpcMaxBufferSize) {
         slice_len = rpc_config::kRpcMaxBufferSize;
       }
-      LOG_TRACE("Encode: encode slice [%d] slice_len = %d, serial_len = %d",
-        i, slice_len, serial_len);
+      LOG_TRACE("Encode: encode slice [%d] slice_len = %d, serial_len = %d", i, slice_len,
+                serial_len);
       buff->AppendInt32(slice_len);
       buff->Write(step_buff.data() + write_pos, slice_len);
       write_pos += slice_len;
     }
-    LOG_TRACE("Encode: encode message success, request_id=%d, method_id=%d",
-              req_protocol->request_id_, req_protocol->method_id_);
+    LOG_TRACE("Encode: encode message success, finished!");
     return true;
   }
 
   // return code: -1 failed; 0-Unfinished; > 0 package buffer size
   virtual int32_t Check(BufferPtr &in, Any &out, uint32_t &request_id, bool &has_request_id,
                         size_t &package_length) {
+    LOG_TRACE("check in:%s", in->String().c_str());
     // check package is valid
     if (in->length() < 12) {
-      package_length = 12;
+      // package_length = 12;
+      LOG_TRACE("Check: data's length < 12, is %ld, out", in->length());
       return 0;
     }
     // check frameToken
     uint32_t token = in->ReadUint32();
     if (token != rpc_config::kRpcPrtBeginToken) {
+      LOG_TRACE("Check: first token is illegal, is %d, out", token);
       return -1;
     }
     // get request_id
     request_id = in->ReadUint32();
     uint32_t list_size = in->ReadUint32();
     if (list_size > rpc_config::kRpcMaxFrameListCnt) {
+      LOG_TRACE("Check: list_size over max, is %d, out", list_size);
       return -1;
     }
     // check data list
     uint32_t item_len = 0;
-    package_length = 12;
+    // package_length = 12;
     auto check_buf = in->Slice();
     for (uint32_t i = 0; i < list_size; i++) {
-      package_length += 4;
+      // package_length += 4;
       if (check_buf->length() < 4) {
+        LOG_TRACE("Check: buffer Remaining length < 4, is %ld, out", check_buf->length());
         return 0;
       }
       item_len = check_buf->ReadUint32();
       if (item_len == 0) {
+        LOG_TRACE("Check: slice length == 0, is %d, out", item_len);
         return -1;
       }
       if (item_len > rpc_config::kRpcMaxBufferSize) {
+        LOG_TRACE("Check: item_len(%d) > max item length(%d), out", item_len,
+                  rpc_config::kRpcMaxBufferSize);
         return -1;
       }
-      package_length += item_len;
+      // package_length += item_len;
       if (item_len > check_buf->length()) {
+        LOG_TRACE("Check: item_len(%d) > remaining length(%ld), out", item_len,
+                  check_buf->length());
         return 0;
       }
       check_buf->Skip(item_len);
@@ -227,8 +244,8 @@ class TubeMQCodec final : public CodecProtocol {
       in->Skip(item_len);
     }
     out = buf;
-    LOG_TRACE("Check: received message check success, request_id=%d, readed_len:%d",
-      request_id, readed_len);
+    LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d", request_id,
+              readed_len);
     return readed_len;
   }