You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/16 09:04:35 UTC
[incubator-tubemq] 01/01: Revert "[TUBEMQ-356]C++ SDK Codec decode
add requestid (#268)"
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch revert-268-TUBEMQ-356
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit afe8ca5536baca635e9a2059d6d86ac2b723c377
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed Sep 16 09:04:27 2020 +0000
Revert "[TUBEMQ-356]C++ SDK Codec decode add requestid (#268)"
This reverts commit 60950a6e2e15862e4c37e927ddba597fe0ced8bd.
---
.../tubemq-client-cpp/src/client_connection.cc | 9 +++-
.../tubemq-client-cpp/src/codec_protocol.h | 2 +-
.../tubemq-client-cpp/src/tubemq_codec.h | 53 ++++++++++++++--------
3 files changed, 44 insertions(+), 20 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
index 08288a0..e6ae503 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -168,14 +168,21 @@ void ClientConnection::asyncRead() {
recv_buffer_->WriteBytes(len);
std::error_code error;
size_t availsize = socket_->available(error);
+ LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, availsize:%ld, recvbuffer:%s",
+ ToString().c_str(), len, package_length_, availsize,
+ recv_buffer_->String().c_str());
if (availsize > 0 && !error) {
recv_buffer_->EnsureWritableBytes(availsize);
size_t rlen = socket_->receive(asio::buffer(recv_buffer_->WriteBegin(), availsize));
if (rlen > 0) {
recv_buffer_->WriteBytes(rlen);
}
+ LOG_TRACE("[%s]syncread done, receivelen:%ld, recvbuffer:%s", ToString().c_str(), rlen,
+ recv_buffer_->String().c_str());
}
while (checkPackageDone() > 0 && recv_buffer_->length() > 0) {
+ LOG_TRACE("[%s]recheck packagedone package_length_:%ld, recvbuffer:%s",
+ ToString().c_str(), package_length_, recv_buffer_->String().c_str());
}
asyncRead();
});
@@ -241,7 +248,7 @@ void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any*
ResponseContext rsp;
BufferPtr* buff = any_cast<BufferPtr>(check_out);
if (buff != nullptr) {
- req->req_->codec_->Decode(*buff, request_id, rsp.rsp_);
+ req->req_->codec_->Decode(*buff, rsp.rsp_);
} else {
rsp.rsp_ = *check_out;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h b/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
index 953ceaa..82e68e7 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/codec_protocol.h
@@ -37,7 +37,7 @@ class CodecProtocol {
virtual std::string Name() const = 0;
- virtual bool Decode(const BufferPtr &buff, uint32_t request_id, Any &out) = 0;
+ virtual bool Decode(const BufferPtr &buff, Any &out) = 0;
virtual bool Encode(const Any &in, BufferPtr &buff) = 0;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
index b3996df..c186701 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
@@ -72,10 +72,12 @@ class TubeMQCodec final : public CodecProtocol {
virtual std::string Name() const { return "tubemq_v1"; }
- virtual bool Decode(const BufferPtr &buff, uint32_t request_id, Any &out) {
+ virtual bool Decode(const BufferPtr &buff, Any &out) {
// check total length
+ LOG_TRACE("Decode: full message Decode come, begin decode");
int32_t total_len = buff->length();
if (total_len <= 0) {
+ LOG_TRACE("Decode: total_len <= 0, out, total_len = %d", total_len);
return false;
}
// check package is valid
@@ -86,10 +88,12 @@ class TubeMQCodec final : public CodecProtocol {
google::protobuf::io::ArrayInputStream rawOutput(buff->data(), total_len);
bool result = readDelimitedFrom(&rawOutput, &rpc_header);
if (!result) {
+ LOG_TRACE("Decode: parse RpcConnHeader failure, out");
return result;
}
result = readDelimitedFrom(&rawOutput, &rsp_header);
if (!result) {
+ LOG_TRACE("Decode: parse ResponseHeader failure, out");
return result;
}
ResponseHeader_Status rspStatus = rsp_header.status();
@@ -100,6 +104,7 @@ class TubeMQCodec final : public CodecProtocol {
rsp_protocol->error_msg_ = "OK";
result = readDelimitedFrom(&rawOutput, &response_body);
if (!result) {
+ LOG_TRACE("Decode: parse RspResponseBody failure, out");
return false;
}
rsp_protocol->method_ = response_body.method();
@@ -109,24 +114,27 @@ class TubeMQCodec final : public CodecProtocol {
rsp_protocol->success_ = false;
result = readDelimitedFrom(&rawOutput, &rpc_exception);
if (!result) {
+ LOG_TRACE("Decode: parse RspExceptionBody failure, out");
return false;
}
- string err_info = rpc_exception.exceptionname();
- err_info += delimiter::kDelimiterPound;
- err_info += rpc_exception.stacktrace();
+ string errInfo = rpc_exception.exceptionname();
+ errInfo += delimiter::kDelimiterPound;
+ errInfo += rpc_exception.stacktrace();
rsp_protocol->code_ = err_code::kErrRcvThrowError;
- rsp_protocol->error_msg_ = err_info;
+ rsp_protocol->error_msg_ = errInfo;
}
- rsp_protocol->serial_no_ = request_id;
out = Any(rsp_protocol);
- LOG_TRACE("Decode: decode message finished, success_=%d, request_id=%d",
- rsp_protocol->success_, request_id);
+ LOG_TRACE("Decode: decode message success, finished");
return true;
}
virtual bool Encode(const Any &in, BufferPtr &buff) {
RequestBody req_body;
ReqProtocolPtr req_protocol = any_cast<ReqProtocolPtr>(in);
+
+ LOG_TRACE("Encode: begin encode message, request_id=%d, method_id=%d",
+ req_protocol->request_id_, req_protocol->method_id_);
+
req_body.set_method(req_protocol->method_id_);
req_body.set_timeout(req_protocol->rpc_read_timeout_);
req_body.set_request(req_protocol->prot_msg_);
@@ -164,54 +172,63 @@ class TubeMQCodec final : public CodecProtocol {
if (slice_len > rpc_config::kRpcMaxBufferSize) {
slice_len = rpc_config::kRpcMaxBufferSize;
}
- LOG_TRACE("Encode: encode slice [%d] slice_len = %d, serial_len = %d",
- i, slice_len, serial_len);
+ LOG_TRACE("Encode: encode slice [%d] slice_len = %d, serial_len = %d", i, slice_len,
+ serial_len);
buff->AppendInt32(slice_len);
buff->Write(step_buff.data() + write_pos, slice_len);
write_pos += slice_len;
}
- LOG_TRACE("Encode: encode message success, request_id=%d, method_id=%d",
- req_protocol->request_id_, req_protocol->method_id_);
+ LOG_TRACE("Encode: encode message success, finished!");
return true;
}
// return code: -1 failed; 0-Unfinished; > 0 package buffer size
virtual int32_t Check(BufferPtr &in, Any &out, uint32_t &request_id, bool &has_request_id,
size_t &package_length) {
+ LOG_TRACE("check in:%s", in->String().c_str());
// check package is valid
if (in->length() < 12) {
- package_length = 12;
+ // package_length = 12;
+ LOG_TRACE("Check: data's length < 12, is %ld, out", in->length());
return 0;
}
// check frameToken
uint32_t token = in->ReadUint32();
if (token != rpc_config::kRpcPrtBeginToken) {
+ LOG_TRACE("Check: first token is illegal, is %d, out", token);
return -1;
}
// get request_id
request_id = in->ReadUint32();
uint32_t list_size = in->ReadUint32();
if (list_size > rpc_config::kRpcMaxFrameListCnt) {
+ LOG_TRACE("Check: list_size over max, is %d, out", list_size);
return -1;
}
// check data list
uint32_t item_len = 0;
- package_length = 12;
+ // package_length = 12;
auto check_buf = in->Slice();
for (uint32_t i = 0; i < list_size; i++) {
- package_length += 4;
+ // package_length += 4;
if (check_buf->length() < 4) {
+ LOG_TRACE("Check: buffer Remaining length < 4, is %ld, out", check_buf->length());
return 0;
}
item_len = check_buf->ReadUint32();
if (item_len == 0) {
+ LOG_TRACE("Check: slice length == 0, is %d, out", item_len);
return -1;
}
if (item_len > rpc_config::kRpcMaxBufferSize) {
+ LOG_TRACE("Check: item_len(%d) > max item length(%d), out", item_len,
+ rpc_config::kRpcMaxBufferSize);
return -1;
}
- package_length += item_len;
+ // package_length += item_len;
if (item_len > check_buf->length()) {
+ LOG_TRACE("Check: item_len(%d) > remaining length(%ld), out", item_len,
+ check_buf->length());
return 0;
}
check_buf->Skip(item_len);
@@ -227,8 +244,8 @@ class TubeMQCodec final : public CodecProtocol {
in->Skip(item_len);
}
out = buf;
- LOG_TRACE("Check: received message check success, request_id=%d, readed_len:%d",
- request_id, readed_len);
+ LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d", request_id,
+ readed_len);
return readed_len;
}