You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/11/22 20:40:26 UTC

[geode-native] branch develop updated: gnmsg - decode REGISTER_INTEREST message (#895)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 746f771  gnmsg - decode REGISTER_INTEREST message (#895)
746f771 is described below

commit 746f771674ab35e446668f6b32c9ca0282b793e2
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Mon Nov 22 12:40:17 2021 -0800

    gnmsg - decode REGISTER_INTEREST message (#895)
    
    * Make log-rolling work with relative paths
    * Don't report individual chunk stats, just stats for the complete
    response.
    * Parse REGISTER_INTEREST client message details
    * Tiny performance improvement to regex parsing
      - Really need to spend time tuning these
---
 tools/gnmsg/chunked_message_decoder.py | 35 ++++++++++----------
 tools/gnmsg/client_message_decoder.py  |  2 +-
 tools/gnmsg/client_messages.py         | 60 ++++++++++++++++++++++++++++++++++
 tools/gnmsg/gnmsg.py                   | 18 +++++++---
 tools/gnmsg/interest_policy.py         | 20 ++++++++++++
 tools/gnmsg/interest_type.py           | 22 +++++++++++++
 tools/gnmsg/server_message_decoder.py  | 34 ++++++++++---------
 7 files changed, 152 insertions(+), 39 deletions(-)

diff --git a/tools/gnmsg/chunked_message_decoder.py b/tools/gnmsg/chunked_message_decoder.py
index ef3330c..e030c46 100644
--- a/tools/gnmsg/chunked_message_decoder.py
+++ b/tools/gnmsg/chunked_message_decoder.py
@@ -27,7 +27,7 @@ class ChunkedResponseDecoder:
         self.reset()
 
     def add_header(self, connection, header):
-        if len(self.chunked_message) > 0:
+        if len(self.chunked_message) > 2:
             raise Exception(
                 "Previous chunked message is not completed, can't process another header"
             )
@@ -52,7 +52,6 @@ class ChunkedResponseDecoder:
             flags = 0
             (chunk_size, offset) = call_reader_function(header, offset, read_int_value)
             (flags, offset) = call_reader_function(header, offset, read_byte_value)
-            self.chunked_message["ChunkInfo"] = []
             self.add_chunk_header(chunk_size, flags)
         else:
             raise IndexError(
@@ -62,22 +61,23 @@ class ChunkedResponseDecoder:
             )
 
     def add_chunk_header(self, chunk_size, flags):
-        self.chunk_index += 1
-        if len(self.chunked_message) == 0:
+        if len(self.chunked_message) == 2:
             raise Exception("Can't add chunk header before message header")
 
-        key = "Chunk" + str(self.chunk_index)
-        inner_item = dict(ChunkLength=int(chunk_size), Flags=flags)
-        outer_item = {}
-        outer_item[key] = inner_item
-        self.chunked_message["ChunkInfo"].append(outer_item)
+        #
+        # Chunked messages can be *really* large, like several tens of
+        # thousands of chunks for a really huge response.  We used to
+        # report out a list of all the chunk sizes based on all the
+        # chunk headers, but it presented performance issues and really
+        # wasn't conveying particularly interesting info.  Now, we just
+        # tally up all the chunk sizes and report the total size of the
+        # response and the timestamp of the last incoming chunk, both
+        # of which are potentially much more meaningful than the details
+        # of each individual chunk.
+        #
         self.chunk_flags = flags
-
-    def add_chunk(self, chunk):
-        if len(self.chunked_message) == 0:
-            raise Exception("Can't add chunks before message header")
-
-        self.message_body += chunk
+        self.chunked_message["ResponseSize"] += chunk_size
+        self.chunked_message["NumberOfChunks"] += 1
 
     def is_complete_message(self):
         return self.chunk_flags & 0x1
@@ -91,8 +91,7 @@ class ChunkedResponseDecoder:
 
     def reset(self):
         self.header = ""
-        self.message_body = ""
-        self.chunked_message = {}
+        # self.message_body = ""
+        self.chunked_message = {"ResponseSize": 0, "NumberOfChunks": 0}
         self.complete = False
         self.chunk_flags = 0xFF
-        self.chunk_index = -1
diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index 6d9fd30..4895dcc 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -87,7 +87,7 @@ class ClientMessageDecoder(DecoderBase):
         )
 
         self.send_trace_expression_base_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*(.+)"
         )
 
     def get_send_trace_parts_v911(self, line, parts):
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index 19980d1..3ff4428 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -16,6 +16,7 @@
 import sys
 import json
 
+from ds_codes import ds_codes
 from read_values import (
     call_reader_function,
     parse_key_or_value,
@@ -30,6 +31,8 @@ from read_values import (
 )
 from read_parts import read_object_header, read_int_part
 from numeric_conversion import decimal_string_to_hex_string, int_to_hex_string
+from interest_policy import interest_policy
+from interest_type import interest_type
 
 CHARS_IN_MESSAGE_HEADER = 34
 
@@ -48,6 +51,19 @@ def parse_region_part(message_bytes, offset):
     return (region_part, offset)
 
 
+def parse_regex_part(message_bytes, offset):
+    regex_part = {}
+    (regex_part["Size"], offset) = call_reader_function(
+        message_bytes, offset, read_int_value
+    )
+    (regex_part["IsObject"], offset) = call_reader_function(
+        message_bytes, offset, read_byte_value
+    )
+    (regex_part["Expression"], offset) = read_string_value(
+        message_bytes, regex_part["Size"], offset
+    )
+    return (regex_part, offset)
+
 def parse_object_part(message_bytes, offset):
     object_part = {}
     (object_part["Size"], offset) = call_reader_function(
@@ -227,6 +243,31 @@ def parse_byte_and_timeout_part(message_bytes, offset):
     return (byte_and_timeout_part, offset)
 
 
+def parse_interest_result_policy_part(message_bytes, offset):
+    interest_result_policy_part = {}
+    (interest_result_policy_part["Size"], offset) = call_reader_function(
+        message_bytes, offset, read_int_value
+    )
+    (interest_result_policy_part["IsObject"], offset) = call_reader_function(
+        message_bytes, offset, read_byte_value
+    )
+    dscode, offset = call_reader_function(message_bytes, offset, read_byte_value)
+    interest_result_policy_part["DSCode1"] = ds_codes[dscode]
+
+    dscode, offset = call_reader_function(message_bytes, offset, read_byte_value)
+    interest_result_policy_part["DSCode2"] = ds_codes[dscode]
+
+    policy, offset = call_reader_function(message_bytes, offset, read_byte_value)
+    interest_result_policy_part["Policy"] = interest_policy[policy]
+
+    return (interest_result_policy_part, offset)
+
+def parse_interest_type_part(message_bytes, offset):
+    value, offset = parse_raw_int_part(message_bytes, offset)
+    value["InterestType"] = interest_type[value["Value"]]
+    del value["Value"]
+    return value, offset
+
 def read_put_message(properties, message_bytes, offset):
     (properties["Region"], offset) = parse_region_part(message_bytes, offset)
     (properties["Operation"], offset) = parse_operation_part(message_bytes, offset)
@@ -434,6 +475,24 @@ def read_add_pdx_type_message(properties, message_bytes, offset):
     properties["PdxType"], offset = read_object_as_raw_bytes(message_bytes, offset)
     properties["TypeId"], offset = read_int_part(message_bytes, offset)
 
+def read_register_interest_message(properties, message_bytes, offset):
+    properties["Region"], offset = parse_region_part(message_bytes, offset)
+    properties["InterestType"], offset = parse_interest_type_part(message_bytes, offset)
+    properties["InterestResultPolicy"], offset = parse_interest_result_policy_part(message_bytes, offset)
+    properties["IsDurable"], offset = parse_raw_byte_part(message_bytes, offset)
+    properties["Regex"], offset = parse_regex_part(message_bytes, offset)
+
+    properties["Param1"], offset = read_object_as_raw_bytes(message_bytes, offset)
+    byte_values = properties["Param1"]["Bytes"]
+    del properties["Param1"]["Bytes"]
+    properties["Param1"]["ReceiveValues"] = int(byte_values)
+
+    properties["Param2"], offset = read_object_as_raw_bytes(message_bytes, offset)
+    byte_values = properties["Param2"]["Bytes"]
+    del properties["Param2"]["Bytes"]
+    caching_enabled, serialize_values = byte_values.split(" ")
+    properties["Param2"]["CachingEnabled"] = int(caching_enabled)
+    properties["Param2"]["SerializeValues"] = int(serialize_values)
 
 client_message_parsers = {
     "ADD_PDX_TYPE": read_add_pdx_type_message,
@@ -452,6 +511,7 @@ client_message_parsers = {
     "KEY_SET": read_key_set,
     "PUT": read_put_message,
     "QUERY": read_query_message,
+    "REGISTER_INTEREST": read_register_interest_message,
     "REQUEST": read_request_message,
     "STOPCQ_MSG_TYPE": read_stopcq_or_closecq_msg_type_message,
     "USER_CREDENTIAL_MESSAGE": read_user_credential_message,
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index 999824f..3b46257 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -61,7 +61,7 @@ def scan_opened_file(
     if dump_handshake:
         handshake_decoder = HandshakeDecoder(output_queue)
         for line in file:
-            handshake_decoder.process_line(line.decode("utf-8"))
+            handshake_decoder.process_line(line.decode("utf-8").rstrip())
             try:
                 data = output_queue.get_nowait()
                 for key, value in data.items():
@@ -73,7 +73,7 @@ def scan_opened_file(
 
     separator = start_string
     for line in file:
-        linestr = line.decode("utf-8")
+        linestr = line.decode("utf-8").rstrip()
         client_decoder.process_line(linestr)
         server_decoder.process_line(linestr)
         try:
@@ -148,13 +148,19 @@ def scan_file_sequence(file, handshake, messages, thread_id):
 
         while True:
             if last_chance:
-                filename = dirname + os.sep + root + ext
+                if len(dirname) > 0:
+                    filename = dirname + os.sep + root + ext
+                else:
+                    filename = root + ext
             else:
-                filename = dirname + os.sep + root + "-" + str(roll_index) + ext
+                if len(dirname) > 0:
+                    filename = dirname + os.sep + root + "-" + str(roll_index) + ext
+                else:
+                    filename = root + "-" + str(roll_index) + ext
 
             try:
-                f = open(filename, "rb")
                 print("Scanning " + filename, file=sys.stderr)
+                f = open(filename, "rb")
                 scan_opened_file(
                     f,
                     handshake_decoder,
@@ -172,6 +178,8 @@ def scan_file_sequence(file, handshake, messages, thread_id):
                 if last_chance:
                     break
             except FileNotFoundError:
+                if last_chance:
+                    break
                 last_chance = True
                 continue
 
diff --git a/tools/gnmsg/interest_policy.py b/tools/gnmsg/interest_policy.py
new file mode 100644
index 0000000..50fb0e4
--- /dev/null
+++ b/tools/gnmsg/interest_policy.py
@@ -0,0 +1,20 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+interest_policy = {
+    0: "NONE",
+    1: "KEYS",
+    2: "KEYS/VALUES",
+}
diff --git a/tools/gnmsg/interest_type.py b/tools/gnmsg/interest_type.py
new file mode 100644
index 0000000..b7ce768
--- /dev/null
+++ b/tools/gnmsg/interest_type.py
@@ -0,0 +1,22 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+interest_type = {
+    0: "KEY",
+    1: "REGULAR_EXPRESSION",
+    2: "FILTER_CLASS",
+    3: "OQL_QUERY",
+    4: "CQ"
+}
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index ba03f40..ca4ce47 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -51,15 +51,15 @@ class ServerMessageDecoder(DecoderBase):
         self.threads_connections_ = {}
 
         self.connection_to_tid_expression_ = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*(.+)"
         )
 
         self.trace_header_with_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*(.+)"
         )
 
         self.trace_header_without_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*(.+)"
         )
 
         self.trace_header_v911_expression_ = re.compile(
@@ -67,19 +67,19 @@ class ServerMessageDecoder(DecoderBase):
         )
 
         self.receive_trace_body_expression_ = re.compile(
-            ":\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            ":\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*(.+)"
         )
 
         self.security_trace_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*(.+)"
         )
 
         self.response_header_with_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
         )
 
         self.response_header_without_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
         )
 
         self.chunk_header_with_pointer_expression_ = re.compile(
@@ -87,14 +87,14 @@ class ServerMessageDecoder(DecoderBase):
         )
 
         self.chunk_header_without_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=(.+)"
         )
 
         self.chunk_bytes_with_pointer_expression_ = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
         )
         self.chunk_bytes_without_pointer_expression_ = re.compile(
-            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
         )
 
     def associate_connection_to_tid(self, line):
@@ -381,17 +381,21 @@ class ServerMessageDecoder(DecoderBase):
             else:
                 self.chunk_decoders_[tid] = ChunkedResponseDecoder()
                 self.chunk_decoders_[tid].add_header(parts[2], parts[4])
+
+            if self.chunk_decoders_[tid].is_complete_message():
+                receive_trace = self.chunk_decoders_[tid].get_decoded_message(
+                    parts[0]
+                )
+                receive_trace["tid"] = str(tid)
+                self.output_queue_.put({"message": receive_trace})
+                self.chunk_decoders_[tid].reset()
         elif self.get_chunk_header(line, parts):
             flags = 0xFF
             size = 0
             tid = parts[1]
             (flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
             if tid in self.chunk_decoders_.keys():
-                self.chunk_decoders_[tid].add_chunk_header(parts[3], flags)
-        elif self.get_chunk_bytes(line, parts):
-            tid = parts[1]
-            if tid in self.chunk_decoders_.keys():
-                self.chunk_decoders_[tid].add_chunk(parts[3])
+                self.chunk_decoders_[tid].add_chunk_header(int(parts[3]), flags)
                 if self.chunk_decoders_[tid].is_complete_message():
                     receive_trace = self.chunk_decoders_[tid].get_decoded_message(
                         parts[0]