You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2021/01/26 17:07:34 UTC
[geode-native] branch develop updated: GEODE-8868: Add thread_id
parameter to gnmsg command line (#727)
This is an automated email from the ASF dual-hosted git repository.
echobravo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new a4afaf3 GEODE-8868: Add thread_id parameter to gnmsg command line (#727)
a4afaf3 is described below
commit a4afaf31d9739172171f82e90201dd13abd6e1e1
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Tue Jan 26 09:07:23 2021 -0800
GEODE-8868: Add thread_id parameter to gnmsg command line (#727)
- filter messages by thread, so we can see which responses correspond to
which requests
- also decoded the CONTAINS_KEY message in the course of other debugging
Co-authored-by: Blake Bender <bb...@vmware.com>
---
tools/gnmsg/client_message_decoder.py | 18 +++-
tools/gnmsg/client_messages.py | 11 +++
tools/gnmsg/command_line.py | 4 +-
tools/gnmsg/gnmsg.py | 18 ++--
tools/gnmsg/server_message_decoder.py | 164 +++++++++++++++++++++++++---------
5 files changed, 160 insertions(+), 55 deletions(-)
diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index c9f9fb5..4631747 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -40,6 +40,9 @@ class ClientMessageDecoder(DecoderBase):
"10.1.1": self.get_send_trace_parts_base,
"10.1.2": self.get_send_trace_parts_base,
"10.1.3": self.get_send_trace_parts_base,
+ "10.1.4": self.get_send_trace_parts_base,
+ "10.2.0": self.get_send_trace_parts_base,
+ "10.2.1": self.get_send_trace_parts_base,
"9.1.1": self.get_send_trace_parts_v911,
}
self.send_trace_parsers = {
@@ -48,6 +51,9 @@ class ClientMessageDecoder(DecoderBase):
"10.1.1": self.parse_request_fields_base,
"10.1.2": self.parse_request_fields_base,
"10.1.3": self.parse_request_fields_base,
+ "10.1.4": self.parse_request_fields_base,
+ "10.2.0": self.parse_request_fields_base,
+ "10.2.1": self.parse_request_fields_base,
"9.1.1": self.parse_request_fields_v911,
}
#
@@ -106,6 +112,8 @@ class ClientMessageDecoder(DecoderBase):
match = expression.search(line)
if match:
parts.append(dateutil.parser.parse(match.group(1)))
+ # TODO: Revisit parsing TID here if we ever see a v9 client log again
+ parts.append("0")
parts.append(match.group(2))
parts.append(match.group(3))
result = True
@@ -115,13 +123,14 @@ class ClientMessageDecoder(DecoderBase):
def get_send_trace_parts_base(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
parts.append(parser.parse(match.group(1)))
parts.append(match.group(2))
parts.append(match.group(3))
+ parts.append(match.group(4))
result = True
return result
@@ -133,7 +142,7 @@ class ClientMessageDecoder(DecoderBase):
def get_add_security_trace_parts(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
@@ -141,6 +150,7 @@ class ClientMessageDecoder(DecoderBase):
parts.append(match.group(2))
parts.append(match.group(3))
parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
return result
@@ -208,10 +218,10 @@ class ClientMessageDecoder(DecoderBase):
parts = []
if self.get_send_trace_parts(line, parts):
- send_trace["Timestamp"], send_trace["Connection"], message_bytes = parts
+ send_trace["Timestamp"], send_trace["tid"], send_trace["Connection"], message_bytes = parts
is_send_trace = True
elif self.get_add_security_trace_parts(line, parts):
- timestamp, connection, security_footer_length, message_bytes = parts
+ timestamp, tid, connection, security_footer_length, message_bytes = parts
is_add_security_trace = True
else:
return
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index 6f8b9e1..a6f03ae 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -265,6 +265,16 @@ def read_close_connection_message(properties, message_bytes, offset):
properties["ObjectPart"] = object_part
+def read_contains_key_message(properties, message_bytes, offset):
+ (properties["RegionPart"], offset) = parse_region_part(message_bytes, offset)
+ (properties["Key"], offset) = parse_key_or_value(message_bytes, offset)
+ (request_type, offset) = parse_raw_int_part(message_bytes, offset)
+ if request_type["Value"] == 1:
+ properties["RequestType"] == "ContainsValueForKey"
+ else:
+ properties["RequestType"] = "ContainsKey"
+
+
def read_destroy_message(properties, message_bytes, offset):
if properties["Parts"] > 5:
raise Exception(
@@ -390,6 +400,7 @@ client_message_parsers = {
"PUT": read_put_message,
"REQUEST": read_request_message,
"CLOSE_CONNECTION": read_close_connection_message,
+ "CONTAINS_KEY": read_contains_key_message,
"DESTROY": read_destroy_message,
"GET_CLIENT_PARTITION_ATTRIBUTES": read_get_client_partition_attributes_message,
"GET_CLIENT_PR_METADATA": read_get_client_pr_metadata_message,
diff --git a/tools/gnmsg/command_line.py b/tools/gnmsg/command_line.py
index dac4c59..aea92d3 100755
--- a/tools/gnmsg/command_line.py
+++ b/tools/gnmsg/command_line.py
@@ -37,6 +37,8 @@ def parse_command_line():
help="(optionally) print out regular message details",
)
+ parser.add_argument("--thread_id", metavar="T", nargs="?", help="Show only messages on this thread")
+
args = parser.parse_args()
if args.file is None:
@@ -44,4 +46,4 @@ def parse_command_line():
parser.print_help()
sys.exit(1)
- return (args.file, args.handshake, args.messages)
+ return (args.file, args.handshake, args.messages, args.thread_id)
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index 37ff72f..fd27b83 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -44,7 +44,7 @@ from server_message_decoder import ServerMessageDecoder
from handshake_decoder import HandshakeDecoder
-def scan_file(filename, dump_handshake, dump_messages):
+def scan_file(filename, dump_handshake, dump_messages, thread_id):
output_queue = queue.Queue()
separator = ""
if dump_handshake:
@@ -56,7 +56,7 @@ def scan_file(filename, dump_handshake, dump_messages):
data = output_queue.get_nowait()
for key, value in data.items():
if key == "handshake":
- print(separator + json.dumps(data, indent=2, default=str))
+ print(separator + json.dumps(value, indent=2, default=str))
separator = ","
except queue.Empty:
continue
@@ -73,8 +73,14 @@ def scan_file(filename, dump_handshake, dump_messages):
data = output_queue.get_nowait()
for key, value in data.items():
if key == "message" and dump_messages:
- print(separator + json.dumps(data, indent=2, default=str))
- separator = ","
+ if thread_id:
+ if "tid" in value.keys() and value["tid"] == thread_id:
+ print(separator + json.dumps(value, indent=2, default=str))
+ separator = ","
+ else:
+ print(separator + json.dumps(value, indent=2, default=str))
+ separator = ","
+
except queue.Empty:
continue
except:
@@ -92,5 +98,5 @@ def scan_file(filename, dump_handshake, dump_messages):
if __name__ == "__main__":
- (file, handshake, messages) = command_line.parse_command_line()
- scan_file(file, handshake, messages)
+ (file, handshake, messages, thread_id) = command_line.parse_command_line()
+ scan_file(file, handshake, messages, thread_id)
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index b6fcc96..46ca955 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -15,6 +15,7 @@
# limitations under the License.
import re
import struct
+import sys
from dateutil import parser
@@ -33,7 +34,7 @@ class ServerMessageDecoder(DecoderBase):
self.receive_trace_parts_retriever_ = None
self.receive_trace_parser_ = None
self.connection_states_ = {}
- self.last_header_ = {}
+ self.headers_ = {}
self.nc_version_ = None
self.get_receive_trace_parts_functions_ = {
"0.0.42": self.get_receive_trace_header_base,
@@ -41,6 +42,9 @@ class ServerMessageDecoder(DecoderBase):
"10.1.1": self.get_receive_trace_header_base,
"10.1.2": self.get_receive_trace_header_base,
"10.1.3": self.get_receive_trace_header_base,
+ "10.1.4": self.get_receive_trace_header_base,
+ "10.2.0": self.get_receive_trace_header_base,
+ "10.2.1": self.get_receive_trace_header_base,
"9.1.1": self.get_receive_trace_header_v911,
}
self.receive_trace_parsers_ = {
@@ -49,9 +53,13 @@ class ServerMessageDecoder(DecoderBase):
"10.1.1": self.parse_response_fields_base,
"10.1.2": self.parse_response_fields_base,
"10.1.3": self.parse_response_fields_base,
+ "10.1.4": self.parse_response_fields_base,
+ "10.2.0": self.parse_response_fields_base,
+ "10.2.1": self.parse_response_fields_base,
"9.1.1": self.parse_response_fields_v911,
}
self.chunk_decoder = ChunkedResponseDecoder()
+ self.threads_connections_ = {}
def search_for_version(self, line):
if self.nc_version_ == None:
@@ -69,16 +77,34 @@ class ServerMessageDecoder(DecoderBase):
self.nc_version_
]
+ def associate_connection_to_tid(self, line):
+ result = False
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ )
+ match = expression.search(line)
+ if match:
+ tid = match.group(2)
+ connection = match.group(3)
+ self.threads_connections_[tid] = connection
+ result = True
+
+ return result
def get_receive_trace_header_with_pointer(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
- parts.append(match.group(1))
- parts.append(match.group(2))
- parts.append(match.group(3))
+ date_time = match.group(1)
+ tid = match.group(2)
+ connection = match.group(3)
+ bytes = match.group(4)
+ parts.append(date_time)
+ parts.append(tid)
+ parts.append(connection)
+ parts.append(bytes)
result = True
return result
@@ -86,13 +112,18 @@ class ServerMessageDecoder(DecoderBase):
def get_receive_trace_header_without_pointer(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
+ tid = match.group(2)
parts.append(match.group(1))
- parts.append("0")
- parts.append(match.group(2))
+ parts.append(tid)
+ if tid in self.threads_connections_.keys():
+ parts.append(self.threads_connections_[tid])
+ else:
+ parts.append("0")
+ parts.append(match.group(3))
result = True
return result
@@ -107,13 +138,18 @@ class ServerMessageDecoder(DecoderBase):
def get_receive_trace_header_v911(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage: received header from endpoint.*bytes:\s*([\d| ]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage: received header from endpoint.*bytes:\s*([\d| ]+)"
)
match = expression.search(line)
if match:
+ tid = match.group(2)
parts.append(parser.parse(match.group(1)))
- parts.append("0")
- parts.append(match.group(2))
+ parts.append(parser.parse(tid))
+ if tid in self.threads_connections_.keys():
+ parts.append(self.threads_connections_[tid])
+ else:
+ parts.append("0")
+ parts.append(match.group(3))
result = True
return result
@@ -121,12 +157,17 @@ class ServerMessageDecoder(DecoderBase):
def get_receive_trace_body_parts(self, line, parts):
result = False
expression = re.compile(
- "received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ ":\d+\s+(\d+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
- message = match.group(1)
- parts.append(message)
+ tid = match.group(1)
+ parts.append(tid)
+ if tid in self.threads_connections_.keys():
+ parts.append(self.threads_connections_[tid])
+ else:
+ parts.append("0")
+ parts.append(match.group(2))
result = True
return result
@@ -138,7 +179,7 @@ class ServerMessageDecoder(DecoderBase):
def get_add_security_trace_parts(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
@@ -146,6 +187,7 @@ class ServerMessageDecoder(DecoderBase):
parts.append(match.group(2))
parts.append(match.group(3))
parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
return result
@@ -156,7 +198,7 @@ class ServerMessageDecoder(DecoderBase):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
@@ -164,18 +206,20 @@ class ServerMessageDecoder(DecoderBase):
parts.append(match.group(2))
parts.append(match.group(3))
parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
if not result:
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
parts.append(parser.parse(match.group(1)))
- parts.append("")
parts.append(match.group(2))
+ parts.append("")
parts.append(match.group(3))
+ parts.append(match.group(4))
result = True
return result
@@ -184,7 +228,7 @@ class ServerMessageDecoder(DecoderBase):
# Check if this is a header for a chunked message
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
)
match = expression.search(line)
if match:
@@ -192,18 +236,24 @@ class ServerMessageDecoder(DecoderBase):
parts.append(match.group(2))
parts.append(match.group(3))
parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
if not result:
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
)
match = expression.search(line)
if match:
parts.append(parser.parse(match.group(1)))
- parts.append("")
- parts.append(match.group(2))
+ tid = match.group(2)
+ parts.append(tid)
+ if tid in self.threads_connections_.keys():
+ parts.append(self.threads_connections_[tid])
+ else:
+ parts.append("0")
parts.append(match.group(3))
+ parts.append(match.group(4))
result = True
return result
@@ -213,7 +263,7 @@ class ServerMessageDecoder(DecoderBase):
# If it is, add it to the chunked decoder
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
@@ -221,18 +271,24 @@ class ServerMessageDecoder(DecoderBase):
parts.append(match.group(2))
parts.append(match.group(3))
parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
if not result:
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
parts.append(parser.parse(match.group(1)))
- parts.append("")
- parts.append(match.group(2))
+ tid = match.group(2)
+ parts.append(tid)
+ if tid in self.threads_connections_.keys():
+ parts.append(self.threads_connections_[tid])
+ else:
+ parts.append("0")
parts.append(match.group(3))
+ parts.append(match.group(4))
result = True
return result
@@ -292,28 +348,43 @@ class ServerMessageDecoder(DecoderBase):
message_bytes = None
message_body = None
chunk_bytes = None
+ tid = None
self.search_for_version(line)
parts = []
- if self.get_receive_trace_parts(line, parts):
- (
- self.last_header_["Timestamp"],
- self.last_header_["Connection"],
- message_bytes,
- ) = parts
+ if self.associate_connection_to_tid(line):
+ pass
+ elif self.get_receive_trace_parts(line, parts):
+ tid = parts[1]
+ last_header = {"Timestamp": parts[0],
+ "tid": tid,
+ "Connection": parts[2]}
+ message_bytes = parts[3]
+ self.headers_[tid] = last_header
+ connection = parts[2]
+ if connection in self.connection_states_.keys() and self.connection_states_[connection] != self.STATE_NEUTRAL_:
+ print("WARNING: Multiple headers rec'd without a message body.")
+ self.connection_states_[connection] = self.STATE_NEUTRAL_
elif self.get_receive_trace_body_parts(line, parts):
- message_body = parts[0]
- elif self.get_add_security_trace_parts(line, parts):
+ tid = parts[0]
connection = parts[1]
+ message_body = parts[2]
+ elif self.get_add_security_trace_parts(line, parts):
+ tid = parts[1]
+ connection = parts[2]
elif self.get_response_header(line, parts):
- self.chunk_decoder.add_header(parts[1], parts[3])
+ tid = parts[1]
+ connection = parts[2]
+ self.chunk_decoder.add_header(parts[2], parts[4])
elif self.get_chunk_header(line, parts):
flags = 0xff
size = 0
- (flags, size) = read_number_from_hex_string(parts[3], 2, len(parts[3]) - 2)
+ tid = parts[1]
+ (flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
self.chunk_decoder.add_chunk_header(parts[2], flags)
elif self.get_chunk_bytes(line, parts):
+ tid = parts[1]
self.chunk_decoder.add_chunk(parts[3])
if self.chunk_decoder.is_complete_message():
self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
@@ -326,14 +397,18 @@ class ServerMessageDecoder(DecoderBase):
if self.connection_states_[connection] == self.STATE_NEUTRAL_:
if message_bytes:
- self.last_header_["Direction"] = "<---"
+
+ last_header = self.headers_[tid]
+ last_header["Direction"] = "<---"
(
- self.last_header_["Type"],
- self.last_header_["Length"],
- self.last_header_["Parts"],
- self.last_header_["TransactionId"],
- self.last_header_["SecurityFlag"],
+ last_header["Type"],
+ last_header["Length"],
+ last_header["Parts"],
+ last_header["TransactionId"],
+ last_header["SecurityFlag"],
) = self.parse_response_fields(message_bytes)
+ self.headers_[tid] = last_header
+
self.connection_states_[
connection
] = self.STATE_WAITING_FOR_MESSAGE_BODY_
@@ -341,8 +416,9 @@ class ServerMessageDecoder(DecoderBase):
self.connection_states_[connection] == self.STATE_WAITING_FOR_MESSAGE_BODY_
):
if message_body:
- receive_trace = self.last_header_
- self.last_header_ = {}
+ receive_trace = self.headers_[tid]
+ self.headers_[tid] = None
parse_server_message(receive_trace, message_body)
self.connection_states_[connection] = self.STATE_NEUTRAL_
self.output_queue_.put({"message": receive_trace})
+