You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/08/12 20:44:24 UTC

[geode-native] branch develop updated: gnmsg - Fix chunked decoding in some log files (#840)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f8d99e  gnmsg - Fix chunked decoding in some log files (#840)
6f8d99e is described below

commit 6f8d99e75d4f396df4ccf7d51c5a56f117960703
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Thu Aug 12 13:44:18 2021 -0700

    gnmsg - Fix chunked decoding in some log files (#840)
    
    - Chunked messages can come in on multiple threads simultaneously, and
      gnmsg only had one decoder.  If two message headers came in on
      different threads prior to the message body arriving for the first
      thread, gnsmg would get confused and think it had received two headers
      without a body, throwing an exception.  Now we keep a chunk decoder
      per thread, and things run much more smoothly.
---
 tools/gnmsg/server_message_decoder.py | 45 +++++++++++++++++++----------------
 1 file changed, 24 insertions(+), 21 deletions(-)

diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index e473b0a..135f95b 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -47,7 +47,7 @@ class ServerMessageDecoder(DecoderBase):
             self.parse_response_fields_base,
             self.parse_response_fields_v911,
         ]
-        self.chunk_decoder = ChunkedResponseDecoder()
+        self.chunk_decoders_ = {}
         self.threads_connections_ = {}
 
         self.connection_to_tid_expression_ = re.compile(
@@ -360,13 +360,12 @@ class ServerMessageDecoder(DecoderBase):
             last_header = {"Timestamp": parts[0], "tid": tid, "Connection": parts[2]}
             message_bytes = parts[3]
             self.headers_[tid] = last_header
-            connection = parts[2]
             if (
-                connection in self.connection_states_.keys()
-                and self.connection_states_[connection] != self.STATE_NEUTRAL_
+                tid in self.connection_states_.keys()
+                and self.connection_states_[tid] != self.STATE_NEUTRAL_
             ):
                 print("WARNING: Multiple headers rec'd without a message body.")
-            self.connection_states_[connection] = self.STATE_NEUTRAL_
+            self.connection_states_[tid] = self.STATE_NEUTRAL_
         elif self.get_receive_trace_body_parts(line, parts):
             tid = parts[0]
             connection = parts[1]
@@ -377,28 +376,34 @@ class ServerMessageDecoder(DecoderBase):
         elif self.get_response_header(line, parts):
             tid = parts[1]
             connection = parts[2]
-            self.chunk_decoder.add_header(parts[2], parts[4])
+            if tid in self.chunk_decoders_.keys():
+                self.chunk_decoders_[tid].add_header(parts[2], parts[4])
+            else:
+                self.chunk_decoders_[tid] = ChunkedResponseDecoder()
+                self.chunk_decoders_[tid].add_header(parts[2], parts[4])
         elif self.get_chunk_header(line, parts):
             flags = 0xFF
             size = 0
             tid = parts[1]
             (flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
-            self.chunk_decoder.add_chunk_header(parts[3], flags)
+            if tid in self.chunk_decoders_.keys():
+                self.chunk_decoders_[tid].add_chunk_header(parts[3], flags)
         elif self.get_chunk_bytes(line, parts):
             tid = parts[1]
-            self.chunk_decoder.add_chunk(parts[3])
-            if self.chunk_decoder.is_complete_message():
-                self.output_queue_.put(
-                    {"message": self.chunk_decoder.get_decoded_message(parts[0])}
-                )
-                self.chunk_decoder.reset()
+            if tid in self.chunk_decoders_.keys():
+                self.chunk_decoders_[tid].add_chunk(parts[3])
+                if self.chunk_decoders_[tid].is_complete_message():
+                    self.output_queue_.put(
+                        {"message": self.chunk_decoders_[tid].get_decoded_message(parts[0])}
+                    )
+                    self.chunk_decoders_[tid].reset()
         else:
             return
 
-        if connection not in self.connection_states_:
-            self.connection_states_[connection] = self.STATE_NEUTRAL_
+        if tid not in self.connection_states_:
+            self.connection_states_[tid] = self.STATE_NEUTRAL_
 
-        if self.connection_states_[connection] == self.STATE_NEUTRAL_:
+        if self.connection_states_[tid] == self.STATE_NEUTRAL_:
             if message_bytes:
 
                 last_header = self.headers_[tid]
@@ -412,15 +417,13 @@ class ServerMessageDecoder(DecoderBase):
                 ) = self.parse_response_fields(message_bytes)
                 self.headers_[tid] = last_header
 
-                self.connection_states_[
-                    connection
-                ] = self.STATE_WAITING_FOR_MESSAGE_BODY_
+                self.connection_states_[tid] = self.STATE_WAITING_FOR_MESSAGE_BODY_
         elif (
-            self.connection_states_[connection] == self.STATE_WAITING_FOR_MESSAGE_BODY_
+            self.connection_states_[tid] == self.STATE_WAITING_FOR_MESSAGE_BODY_
         ):
             if message_body:
                 receive_trace = self.headers_[tid]
                 self.headers_[tid] = None
                 parse_server_message(receive_trace, message_body)
-                self.connection_states_[connection] = self.STATE_NEUTRAL_
+                self.connection_states_[tid] = self.STATE_NEUTRAL_
                 self.output_queue_.put({"message": receive_trace})