You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/08/12 20:44:24 UTC
[geode-native] branch develop updated: gnmsg - Fix chunked decoding
in some log files (#840)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f8d99e gnmsg - Fix chunked decoding in some log files (#840)
6f8d99e is described below
commit 6f8d99e75d4f396df4ccf7d51c5a56f117960703
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Thu Aug 12 13:44:18 2021 -0700
gnmsg - Fix chunked decoding in some log files (#840)
- Chunked messages can come in on multiple threads simultaneously, and
gnmsg only had one decoder. If two message headers came in on
different threads prior to the message body arriving for the first
thread, gnsmg would get confused and think it had received two headers
without a body, throwing an exception. Now we keep a chunk decoder
per thread, and things run much more smoothly.
---
tools/gnmsg/server_message_decoder.py | 45 +++++++++++++++++++----------------
1 file changed, 24 insertions(+), 21 deletions(-)
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index e473b0a..135f95b 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -47,7 +47,7 @@ class ServerMessageDecoder(DecoderBase):
self.parse_response_fields_base,
self.parse_response_fields_v911,
]
- self.chunk_decoder = ChunkedResponseDecoder()
+ self.chunk_decoders_ = {}
self.threads_connections_ = {}
self.connection_to_tid_expression_ = re.compile(
@@ -360,13 +360,12 @@ class ServerMessageDecoder(DecoderBase):
last_header = {"Timestamp": parts[0], "tid": tid, "Connection": parts[2]}
message_bytes = parts[3]
self.headers_[tid] = last_header
- connection = parts[2]
if (
- connection in self.connection_states_.keys()
- and self.connection_states_[connection] != self.STATE_NEUTRAL_
+ tid in self.connection_states_.keys()
+ and self.connection_states_[tid] != self.STATE_NEUTRAL_
):
print("WARNING: Multiple headers rec'd without a message body.")
- self.connection_states_[connection] = self.STATE_NEUTRAL_
+ self.connection_states_[tid] = self.STATE_NEUTRAL_
elif self.get_receive_trace_body_parts(line, parts):
tid = parts[0]
connection = parts[1]
@@ -377,28 +376,34 @@ class ServerMessageDecoder(DecoderBase):
elif self.get_response_header(line, parts):
tid = parts[1]
connection = parts[2]
- self.chunk_decoder.add_header(parts[2], parts[4])
+ if tid in self.chunk_decoders_.keys():
+ self.chunk_decoders_[tid].add_header(parts[2], parts[4])
+ else:
+ self.chunk_decoders_[tid] = ChunkedResponseDecoder()
+ self.chunk_decoders_[tid].add_header(parts[2], parts[4])
elif self.get_chunk_header(line, parts):
flags = 0xFF
size = 0
tid = parts[1]
(flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
- self.chunk_decoder.add_chunk_header(parts[3], flags)
+ if tid in self.chunk_decoders_.keys():
+ self.chunk_decoders_[tid].add_chunk_header(parts[3], flags)
elif self.get_chunk_bytes(line, parts):
tid = parts[1]
- self.chunk_decoder.add_chunk(parts[3])
- if self.chunk_decoder.is_complete_message():
- self.output_queue_.put(
- {"message": self.chunk_decoder.get_decoded_message(parts[0])}
- )
- self.chunk_decoder.reset()
+ if tid in self.chunk_decoders_.keys():
+ self.chunk_decoders_[tid].add_chunk(parts[3])
+ if self.chunk_decoders_[tid].is_complete_message():
+ self.output_queue_.put(
+ {"message": self.chunk_decoders_[tid].get_decoded_message(parts[0])}
+ )
+ self.chunk_decoders_[tid].reset()
else:
return
- if connection not in self.connection_states_:
- self.connection_states_[connection] = self.STATE_NEUTRAL_
+ if tid not in self.connection_states_:
+ self.connection_states_[tid] = self.STATE_NEUTRAL_
- if self.connection_states_[connection] == self.STATE_NEUTRAL_:
+ if self.connection_states_[tid] == self.STATE_NEUTRAL_:
if message_bytes:
last_header = self.headers_[tid]
@@ -412,15 +417,13 @@ class ServerMessageDecoder(DecoderBase):
) = self.parse_response_fields(message_bytes)
self.headers_[tid] = last_header
- self.connection_states_[
- connection
- ] = self.STATE_WAITING_FOR_MESSAGE_BODY_
+ self.connection_states_[tid] = self.STATE_WAITING_FOR_MESSAGE_BODY_
elif (
- self.connection_states_[connection] == self.STATE_WAITING_FOR_MESSAGE_BODY_
+ self.connection_states_[tid] == self.STATE_WAITING_FOR_MESSAGE_BODY_
):
if message_body:
receive_trace = self.headers_[tid]
self.headers_[tid] = None
parse_server_message(receive_trace, message_body)
- self.connection_states_[connection] = self.STATE_NEUTRAL_
+ self.connection_states_[tid] = self.STATE_NEUTRAL_
self.output_queue_.put({"message": receive_trace})