You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2021/01/26 17:07:34 UTC

[geode-native] branch develop updated: GEODE-8868: Add thread_id parameter to gnmsg command line (#727)

This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new a4afaf3  GEODE-8868: Add thread_id parameter to gnmsg command line (#727)
a4afaf3 is described below

commit a4afaf31d9739172171f82e90201dd13abd6e1e1
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Tue Jan 26 09:07:23 2021 -0800

    GEODE-8868: Add thread_id parameter to gnmsg command line (#727)
    
    - filter messages by thread, so we can see which responses correspond to
      which requests
    - also decoded the CONTAINS_KEY message in the course of other debugging
    
    Co-authored-by: Blake Bender <bb...@vmware.com>
---
 tools/gnmsg/client_message_decoder.py |  18 +++-
 tools/gnmsg/client_messages.py        |  11 +++
 tools/gnmsg/command_line.py           |   4 +-
 tools/gnmsg/gnmsg.py                  |  18 ++--
 tools/gnmsg/server_message_decoder.py | 164 +++++++++++++++++++++++++---------
 5 files changed, 160 insertions(+), 55 deletions(-)

diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index c9f9fb5..4631747 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -40,6 +40,9 @@ class ClientMessageDecoder(DecoderBase):
             "10.1.1": self.get_send_trace_parts_base,
             "10.1.2": self.get_send_trace_parts_base,
             "10.1.3": self.get_send_trace_parts_base,
+            "10.1.4": self.get_send_trace_parts_base,
+            "10.2.0": self.get_send_trace_parts_base,
+            "10.2.1": self.get_send_trace_parts_base,
             "9.1.1": self.get_send_trace_parts_v911,
         }
         self.send_trace_parsers = {
@@ -48,6 +51,9 @@ class ClientMessageDecoder(DecoderBase):
             "10.1.1": self.parse_request_fields_base,
             "10.1.2": self.parse_request_fields_base,
             "10.1.3": self.parse_request_fields_base,
+            "10.1.4": self.parse_request_fields_base,
+            "10.2.0": self.parse_request_fields_base,
+            "10.2.1": self.parse_request_fields_base,
             "9.1.1": self.parse_request_fields_v911,
         }
         #
@@ -106,6 +112,8 @@ class ClientMessageDecoder(DecoderBase):
         match = expression.search(line)
         if match:
             parts.append(dateutil.parser.parse(match.group(1)))
+            # TODO: Revisit parsing TID here if we ever see a v9 client log again
+            parts.append("0")
             parts.append(match.group(2))
             parts.append(match.group(3))
             result = True
@@ -115,13 +123,14 @@ class ClientMessageDecoder(DecoderBase):
     def get_send_trace_parts_base(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
             parts.append(parser.parse(match.group(1)))
             parts.append(match.group(2))
             parts.append(match.group(3))
+            parts.append(match.group(4))
             result = True
 
         return result
@@ -133,7 +142,7 @@ class ClientMessageDecoder(DecoderBase):
     def get_add_security_trace_parts(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
@@ -141,6 +150,7 @@ class ClientMessageDecoder(DecoderBase):
             parts.append(match.group(2))
             parts.append(match.group(3))
             parts.append(match.group(4))
+            parts.append(match.group(5))
             result = True
 
         return result
@@ -208,10 +218,10 @@ class ClientMessageDecoder(DecoderBase):
 
         parts = []
         if self.get_send_trace_parts(line, parts):
-            send_trace["Timestamp"], send_trace["Connection"], message_bytes = parts
+            send_trace["Timestamp"], send_trace["tid"], send_trace["Connection"], message_bytes = parts
             is_send_trace = True
         elif self.get_add_security_trace_parts(line, parts):
-            timestamp, connection, security_footer_length, message_bytes = parts
+            timestamp, tid, connection, security_footer_length, message_bytes = parts
             is_add_security_trace = True
         else:
             return
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index 6f8b9e1..a6f03ae 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -265,6 +265,16 @@ def read_close_connection_message(properties, message_bytes, offset):
     properties["ObjectPart"] = object_part
 
 
+def read_contains_key_message(properties, message_bytes, offset):
+    (properties["RegionPart"], offset) = parse_region_part(message_bytes, offset)
+    (properties["Key"], offset) = parse_key_or_value(message_bytes, offset)
+    (request_type, offset) = parse_raw_int_part(message_bytes, offset)
+    if request_type["Value"] == 1:
+        properties["RequestType"] == "ContainsValueForKey"
+    else:
+        properties["RequestType"] = "ContainsKey"
+
+
 def read_destroy_message(properties, message_bytes, offset):
     if properties["Parts"] > 5:
         raise Exception(
@@ -390,6 +400,7 @@ client_message_parsers = {
     "PUT": read_put_message,
     "REQUEST": read_request_message,
     "CLOSE_CONNECTION": read_close_connection_message,
+    "CONTAINS_KEY": read_contains_key_message,
     "DESTROY": read_destroy_message,
     "GET_CLIENT_PARTITION_ATTRIBUTES": read_get_client_partition_attributes_message,
     "GET_CLIENT_PR_METADATA": read_get_client_pr_metadata_message,
diff --git a/tools/gnmsg/command_line.py b/tools/gnmsg/command_line.py
index dac4c59..aea92d3 100755
--- a/tools/gnmsg/command_line.py
+++ b/tools/gnmsg/command_line.py
@@ -37,6 +37,8 @@ def parse_command_line():
         help="(optionally) print out regular message details",
     )
 
+    parser.add_argument("--thread_id", metavar="T", nargs="?", help="Show only messages on this thread")
+
     args = parser.parse_args()
 
     if args.file is None:
@@ -44,4 +46,4 @@ def parse_command_line():
         parser.print_help()
         sys.exit(1)
 
-    return (args.file, args.handshake, args.messages)
+    return (args.file, args.handshake, args.messages, args.thread_id)
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index 37ff72f..fd27b83 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -44,7 +44,7 @@ from server_message_decoder import ServerMessageDecoder
 from handshake_decoder import HandshakeDecoder
 
 
-def scan_file(filename, dump_handshake, dump_messages):
+def scan_file(filename, dump_handshake, dump_messages, thread_id):
     output_queue = queue.Queue()
     separator = ""
     if dump_handshake:
@@ -56,7 +56,7 @@ def scan_file(filename, dump_handshake, dump_messages):
                     data = output_queue.get_nowait()
                     for key, value in data.items():
                         if key == "handshake":
-                            print(separator + json.dumps(data, indent=2, default=str))
+                            print(separator + json.dumps(value, indent=2, default=str))
                             separator = ","
                 except queue.Empty:
                     continue
@@ -73,8 +73,14 @@ def scan_file(filename, dump_handshake, dump_messages):
                 data = output_queue.get_nowait()
                 for key, value in data.items():
                     if key == "message" and dump_messages:
-                        print(separator + json.dumps(data, indent=2, default=str))
-                        separator = ","
+                        if thread_id:
+                            if "tid" in value.keys() and value["tid"] == thread_id:
+                                print(separator + json.dumps(value, indent=2, default=str))
+                                separator = ","
+                        else:
+                            print(separator + json.dumps(value, indent=2, default=str))
+                            separator = ","
+
             except queue.Empty:
                 continue
             except:
@@ -92,5 +98,5 @@ def scan_file(filename, dump_handshake, dump_messages):
 
 
 if __name__ == "__main__":
-    (file, handshake, messages) = command_line.parse_command_line()
-    scan_file(file, handshake, messages)
+    (file, handshake, messages, thread_id) = command_line.parse_command_line()
+    scan_file(file, handshake, messages, thread_id)
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index b6fcc96..46ca955 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 import re
 import struct
+import sys
 
 from dateutil import parser
 
@@ -33,7 +34,7 @@ class ServerMessageDecoder(DecoderBase):
         self.receive_trace_parts_retriever_ = None
         self.receive_trace_parser_ = None
         self.connection_states_ = {}
-        self.last_header_ = {}
+        self.headers_ = {}
         self.nc_version_ = None
         self.get_receive_trace_parts_functions_ = {
             "0.0.42": self.get_receive_trace_header_base,
@@ -41,6 +42,9 @@ class ServerMessageDecoder(DecoderBase):
             "10.1.1": self.get_receive_trace_header_base,
             "10.1.2": self.get_receive_trace_header_base,
             "10.1.3": self.get_receive_trace_header_base,
+            "10.1.4": self.get_receive_trace_header_base,
+            "10.2.0": self.get_receive_trace_header_base,
+            "10.2.1": self.get_receive_trace_header_base,
             "9.1.1": self.get_receive_trace_header_v911,
         }
         self.receive_trace_parsers_ = {
@@ -49,9 +53,13 @@ class ServerMessageDecoder(DecoderBase):
             "10.1.1": self.parse_response_fields_base,
             "10.1.2": self.parse_response_fields_base,
             "10.1.3": self.parse_response_fields_base,
+            "10.1.4": self.parse_response_fields_base,
+            "10.2.0": self.parse_response_fields_base,
+            "10.2.1": self.parse_response_fields_base,
             "9.1.1": self.parse_response_fields_v911,
         }
         self.chunk_decoder = ChunkedResponseDecoder()
+        self.threads_connections_ = {}
 
     def search_for_version(self, line):
         if self.nc_version_ == None:
@@ -69,16 +77,34 @@ class ServerMessageDecoder(DecoderBase):
                     self.nc_version_
                 ]
 
+    def associate_connection_to_tid(self, line):
+        result = False
+        expression = re.compile(
+            r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+        )
+        match = expression.search(line)
+        if match:
+            tid = match.group(2)
+            connection = match.group(3)
+            self.threads_connections_[tid] = connection
+            result = True
+
+        return result
     def get_receive_trace_header_with_pointer(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
-            parts.append(match.group(1))
-            parts.append(match.group(2))
-            parts.append(match.group(3))
+            date_time = match.group(1)
+            tid = match.group(2)
+            connection = match.group(3)
+            bytes = match.group(4)
+            parts.append(date_time)
+            parts.append(tid)
+            parts.append(connection)
+            parts.append(bytes)
             result = True
 
         return result
@@ -86,13 +112,18 @@ class ServerMessageDecoder(DecoderBase):
     def get_receive_trace_header_without_pointer(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
+            tid = match.group(2)
             parts.append(match.group(1))
-            parts.append("0")
-            parts.append(match.group(2))
+            parts.append(tid)
+            if tid in self.threads_connections_.keys():
+                parts.append(self.threads_connections_[tid])
+            else:
+                parts.append("0")
+            parts.append(match.group(3))
             result = True
 
         return result
@@ -107,13 +138,18 @@ class ServerMessageDecoder(DecoderBase):
     def get_receive_trace_header_v911(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readMessage: received header from endpoint.*bytes:\s*([\d| ]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage: received header from endpoint.*bytes:\s*([\d| ]+)"
         )
         match = expression.search(line)
         if match:
+            tid = match.group(2)
             parts.append(parser.parse(match.group(1)))
-            parts.append("0")
-            parts.append(match.group(2))
+            parts.append(parser.parse(tid))
+            if tid in self.threads_connections_.keys():
+                parts.append(self.threads_connections_[tid])
+            else:
+                parts.append("0")
+            parts.append(match.group(3))
             result = True
 
         return result
@@ -121,12 +157,17 @@ class ServerMessageDecoder(DecoderBase):
     def get_receive_trace_body_parts(self, line, parts):
         result = False
         expression = re.compile(
-            "received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            ":\d+\s+(\d+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
-            message = match.group(1)
-            parts.append(message)
+            tid = match.group(1)
+            parts.append(tid)
+            if tid in self.threads_connections_.keys():
+                parts.append(self.threads_connections_[tid])
+            else:
+                parts.append("0")
+            parts.append(match.group(2))
             result = True
 
         return result
@@ -138,7 +179,7 @@ class ServerMessageDecoder(DecoderBase):
     def get_add_security_trace_parts(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
@@ -146,6 +187,7 @@ class ServerMessageDecoder(DecoderBase):
             parts.append(match.group(2))
             parts.append(match.group(3))
             parts.append(match.group(4))
+            parts.append(match.group(5))
             result = True
 
         return result
@@ -156,7 +198,7 @@ class ServerMessageDecoder(DecoderBase):
         result = False
 
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
@@ -164,18 +206,20 @@ class ServerMessageDecoder(DecoderBase):
             parts.append(match.group(2))
             parts.append(match.group(3))
             parts.append(match.group(4))
+            parts.append(match.group(5))
             result = True
 
         if not result:
             expression = re.compile(
-                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+                r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
             )
             match = expression.search(line)
             if match:
                 parts.append(parser.parse(match.group(1)))
-                parts.append("")
                 parts.append(match.group(2))
+                parts.append("")
                 parts.append(match.group(3))
+                parts.append(match.group(4))
                 result = True
 
         return result
@@ -184,7 +228,7 @@ class ServerMessageDecoder(DecoderBase):
         # Check if this is a header for a chunked message
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
         )
         match = expression.search(line)
         if match:
@@ -192,18 +236,24 @@ class ServerMessageDecoder(DecoderBase):
             parts.append(match.group(2))
             parts.append(match.group(3))
             parts.append(match.group(4))
+            parts.append(match.group(5))
             result = True
 
         if not result:
             expression = re.compile(
-                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+                r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
             )
             match = expression.search(line)
             if match:
                 parts.append(parser.parse(match.group(1)))
-                parts.append("")
-                parts.append(match.group(2))
+                tid = match.group(2)
+                parts.append(tid)
+                if tid in self.threads_connections_.keys():
+                    parts.append(self.threads_connections_[tid])
+                else:
+                    parts.append("0")
                 parts.append(match.group(3))
+                parts.append(match.group(4))
                 result = True
 
         return result
@@ -213,7 +263,7 @@ class ServerMessageDecoder(DecoderBase):
         # If it is, add it to the chunked decoder
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
@@ -221,18 +271,24 @@ class ServerMessageDecoder(DecoderBase):
             parts.append(match.group(2))
             parts.append(match.group(3))
             parts.append(match.group(4))
+            parts.append(match.group(5))
             result = True
 
         if not result:
             expression = re.compile(
-                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+                r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
             )
             match = expression.search(line)
             if match:
                 parts.append(parser.parse(match.group(1)))
-                parts.append("")
-                parts.append(match.group(2))
+                tid = match.group(2)
+                parts.append(tid)
+                if tid in self.threads_connections_.keys():
+                    parts.append(self.threads_connections_[tid])
+                else:
+                    parts.append("0")
                 parts.append(match.group(3))
+                parts.append(match.group(4))
                 result = True
 
         return result
@@ -292,28 +348,43 @@ class ServerMessageDecoder(DecoderBase):
         message_bytes = None
         message_body = None
         chunk_bytes = None
+        tid = None
 
         self.search_for_version(line)
 
         parts = []
-        if self.get_receive_trace_parts(line, parts):
-            (
-                self.last_header_["Timestamp"],
-                self.last_header_["Connection"],
-                message_bytes,
-            ) = parts
+        if self.associate_connection_to_tid(line):
+            pass
+        elif self.get_receive_trace_parts(line, parts):
+            tid = parts[1]
+            last_header = {"Timestamp": parts[0],
+                           "tid": tid,
+                           "Connection": parts[2]}
+            message_bytes = parts[3]
+            self.headers_[tid] = last_header
+            connection = parts[2]
+            if connection in self.connection_states_.keys() and self.connection_states_[connection] != self.STATE_NEUTRAL_:
+                print("WARNING: Multiple headers rec'd without a message body.")
+            self.connection_states_[connection] = self.STATE_NEUTRAL_
         elif self.get_receive_trace_body_parts(line, parts):
-            message_body = parts[0]
-        elif self.get_add_security_trace_parts(line, parts):
+            tid = parts[0]
             connection = parts[1]
+            message_body = parts[2]
+        elif self.get_add_security_trace_parts(line, parts):
+            tid = parts[1]
+            connection = parts[2]
         elif self.get_response_header(line, parts):
-            self.chunk_decoder.add_header(parts[1], parts[3])
+            tid = parts[1]
+            connection = parts[2]
+            self.chunk_decoder.add_header(parts[2], parts[4])
         elif self.get_chunk_header(line, parts):
             flags = 0xff
             size = 0
-            (flags, size) = read_number_from_hex_string(parts[3], 2, len(parts[3]) - 2)
+            tid = parts[1]
+            (flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
             self.chunk_decoder.add_chunk_header(parts[2], flags)
         elif self.get_chunk_bytes(line, parts):
+            tid = parts[1]
             self.chunk_decoder.add_chunk(parts[3])
             if self.chunk_decoder.is_complete_message():
                 self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
@@ -326,14 +397,18 @@ class ServerMessageDecoder(DecoderBase):
 
         if self.connection_states_[connection] == self.STATE_NEUTRAL_:
             if message_bytes:
-                self.last_header_["Direction"] = "<---"
+
+                last_header = self.headers_[tid]
+                last_header["Direction"] = "<---"
                 (
-                    self.last_header_["Type"],
-                    self.last_header_["Length"],
-                    self.last_header_["Parts"],
-                    self.last_header_["TransactionId"],
-                    self.last_header_["SecurityFlag"],
+                    last_header["Type"],
+                    last_header["Length"],
+                    last_header["Parts"],
+                    last_header["TransactionId"],
+                    last_header["SecurityFlag"],
                 ) = self.parse_response_fields(message_bytes)
+                self.headers_[tid] = last_header
+
                 self.connection_states_[
                     connection
                 ] = self.STATE_WAITING_FOR_MESSAGE_BODY_
@@ -341,8 +416,9 @@ class ServerMessageDecoder(DecoderBase):
             self.connection_states_[connection] == self.STATE_WAITING_FOR_MESSAGE_BODY_
         ):
             if message_body:
-                receive_trace = self.last_header_
-                self.last_header_ = {}
+                receive_trace = self.headers_[tid]
+                self.headers_[tid] = None
                 parse_server_message(receive_trace, message_body)
                 self.connection_states_[connection] = self.STATE_NEUTRAL_
                 self.output_queue_.put({"message": receive_trace})
+