You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/11/22 20:40:26 UTC
[geode-native] branch develop updated: gnmsg - decode REGISTER_INTEREST message (#895)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 746f771 gnmsg - decode REGISTER_INTEREST message (#895)
746f771 is described below
commit 746f771674ab35e446668f6b32c9ca0282b793e2
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Mon Nov 22 12:40:17 2021 -0800
gnmsg - decode REGISTER_INTEREST message (#895)
* Make log-rolling work with relative paths
* Don't report individual chunk stats, just stats for the complete
response.
* Parse REGISTER_INTEREST client message details
* Tiny performance improvement to regex parsing
- Really need to spend time tuning these
---
tools/gnmsg/chunked_message_decoder.py | 35 ++++++++++----------
tools/gnmsg/client_message_decoder.py | 2 +-
tools/gnmsg/client_messages.py | 60 ++++++++++++++++++++++++++++++++++
tools/gnmsg/gnmsg.py | 18 +++++++---
tools/gnmsg/interest_policy.py | 20 ++++++++++++
tools/gnmsg/interest_type.py | 22 +++++++++++++
tools/gnmsg/server_message_decoder.py | 34 ++++++++++---------
7 files changed, 152 insertions(+), 39 deletions(-)
diff --git a/tools/gnmsg/chunked_message_decoder.py b/tools/gnmsg/chunked_message_decoder.py
index ef3330c..e030c46 100644
--- a/tools/gnmsg/chunked_message_decoder.py
+++ b/tools/gnmsg/chunked_message_decoder.py
@@ -27,7 +27,7 @@ class ChunkedResponseDecoder:
self.reset()
def add_header(self, connection, header):
- if len(self.chunked_message) > 0:
+ if len(self.chunked_message) > 2:
raise Exception(
"Previous chunked message is not completed, can't process another header"
)
@@ -52,7 +52,6 @@ class ChunkedResponseDecoder:
flags = 0
(chunk_size, offset) = call_reader_function(header, offset, read_int_value)
(flags, offset) = call_reader_function(header, offset, read_byte_value)
- self.chunked_message["ChunkInfo"] = []
self.add_chunk_header(chunk_size, flags)
else:
raise IndexError(
@@ -62,22 +61,23 @@ class ChunkedResponseDecoder:
)
def add_chunk_header(self, chunk_size, flags):
- self.chunk_index += 1
- if len(self.chunked_message) == 0:
+ if len(self.chunked_message) == 2:
raise Exception("Can't add chunk header before message header")
- key = "Chunk" + str(self.chunk_index)
- inner_item = dict(ChunkLength=int(chunk_size), Flags=flags)
- outer_item = {}
- outer_item[key] = inner_item
- self.chunked_message["ChunkInfo"].append(outer_item)
+ #
+ # Chunked messages can be *really* large, like several tens of
+ # thousands of chunks for a really huge response. We used to
+ # report out a list of all the chunk sizes based on all the
+ # chunk headers, but it presented performance issues and really
+ # wasn't conveying particularly interesting info. Now, we just
+ # tally up all the chunk sizes and report the total size of the
+ # response and the timestamp of the last incoming chunk, both
+ # of which are potentially much more meaningful than the details
+ # of each individual chunk.
+ #
self.chunk_flags = flags
-
- def add_chunk(self, chunk):
- if len(self.chunked_message) == 0:
- raise Exception("Can't add chunks before message header")
-
- self.message_body += chunk
+ self.chunked_message["ResponseSize"] += chunk_size
+ self.chunked_message["NumberOfChunks"] += 1
def is_complete_message(self):
return self.chunk_flags & 0x1
@@ -91,8 +91,7 @@ class ChunkedResponseDecoder:
def reset(self):
self.header = ""
- self.message_body = ""
- self.chunked_message = {}
+ # self.message_body = ""
+ self.chunked_message = {"ResponseSize": 0, "NumberOfChunks": 0}
self.complete = False
self.chunk_flags = 0xFF
- self.chunk_index = -1
diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index 6d9fd30..4895dcc 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -87,7 +87,7 @@ class ClientMessageDecoder(DecoderBase):
)
self.send_trace_expression_base_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*(.+)"
)
def get_send_trace_parts_v911(self, line, parts):
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index 19980d1..3ff4428 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -16,6 +16,7 @@
import sys
import json
+from ds_codes import ds_codes
from read_values import (
call_reader_function,
parse_key_or_value,
@@ -30,6 +31,8 @@ from read_values import (
)
from read_parts import read_object_header, read_int_part
from numeric_conversion import decimal_string_to_hex_string, int_to_hex_string
+from interest_policy import interest_policy
+from interest_type import interest_type
CHARS_IN_MESSAGE_HEADER = 34
@@ -48,6 +51,19 @@ def parse_region_part(message_bytes, offset):
return (region_part, offset)
+def parse_regex_part(message_bytes, offset):
+ regex_part = {}
+ (regex_part["Size"], offset) = call_reader_function(
+ message_bytes, offset, read_int_value
+ )
+ (regex_part["IsObject"], offset) = call_reader_function(
+ message_bytes, offset, read_byte_value
+ )
+ (regex_part["Expression"], offset) = read_string_value(
+ message_bytes, regex_part["Size"], offset
+ )
+ return (regex_part, offset)
+
def parse_object_part(message_bytes, offset):
object_part = {}
(object_part["Size"], offset) = call_reader_function(
@@ -227,6 +243,31 @@ def parse_byte_and_timeout_part(message_bytes, offset):
return (byte_and_timeout_part, offset)
+def parse_interest_result_policy_part(message_bytes, offset):
+ interest_result_policy_part = {}
+ (interest_result_policy_part["Size"], offset) = call_reader_function(
+ message_bytes, offset, read_int_value
+ )
+ (interest_result_policy_part["IsObject"], offset) = call_reader_function(
+ message_bytes, offset, read_byte_value
+ )
+ dscode, offset = call_reader_function(message_bytes, offset, read_byte_value)
+ interest_result_policy_part["DSCode1"] = ds_codes[dscode]
+
+ dscode, offset = call_reader_function(message_bytes, offset, read_byte_value)
+ interest_result_policy_part["DSCode2"] = ds_codes[dscode]
+
+ policy, offset = call_reader_function(message_bytes, offset, read_byte_value)
+ interest_result_policy_part["Policy"] = interest_policy[policy]
+
+ return (interest_result_policy_part, offset)
+
+def parse_interest_type_part(message_bytes, offset):
+ value, offset = parse_raw_int_part(message_bytes, offset)
+ value["InterestType"] = interest_type[value["Value"]]
+ del value["Value"]
+ return value, offset
+
def read_put_message(properties, message_bytes, offset):
(properties["Region"], offset) = parse_region_part(message_bytes, offset)
(properties["Operation"], offset) = parse_operation_part(message_bytes, offset)
@@ -434,6 +475,24 @@ def read_add_pdx_type_message(properties, message_bytes, offset):
properties["PdxType"], offset = read_object_as_raw_bytes(message_bytes, offset)
properties["TypeId"], offset = read_int_part(message_bytes, offset)
+def read_register_interest_message(properties, message_bytes, offset):
+ properties["Region"], offset = parse_region_part(message_bytes, offset)
+ properties["InterestType"], offset = parse_interest_type_part(message_bytes, offset)
+ properties["InterestResultPolicy"], offset = parse_interest_result_policy_part(message_bytes, offset)
+ properties["IsDurable"], offset = parse_raw_byte_part(message_bytes, offset)
+ properties["Regex"], offset = parse_regex_part(message_bytes, offset)
+
+ properties["Param1"], offset = read_object_as_raw_bytes(message_bytes, offset)
+ byte_values = properties["Param1"]["Bytes"]
+ del properties["Param1"]["Bytes"]
+ properties["Param1"]["ReceiveValues"] = int(byte_values)
+
+ properties["Param2"], offset = read_object_as_raw_bytes(message_bytes, offset)
+ byte_values = properties["Param2"]["Bytes"]
+ del properties["Param2"]["Bytes"]
+ caching_enabled, serialize_values = byte_values.split(" ")
+ properties["Param2"]["CachingEnabled"] = int(caching_enabled)
+ properties["Param2"]["SerializeValues"] = int(serialize_values)
client_message_parsers = {
"ADD_PDX_TYPE": read_add_pdx_type_message,
@@ -452,6 +511,7 @@ client_message_parsers = {
"KEY_SET": read_key_set,
"PUT": read_put_message,
"QUERY": read_query_message,
+ "REGISTER_INTEREST": read_register_interest_message,
"REQUEST": read_request_message,
"STOPCQ_MSG_TYPE": read_stopcq_or_closecq_msg_type_message,
"USER_CREDENTIAL_MESSAGE": read_user_credential_message,
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index 999824f..3b46257 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -61,7 +61,7 @@ def scan_opened_file(
if dump_handshake:
handshake_decoder = HandshakeDecoder(output_queue)
for line in file:
- handshake_decoder.process_line(line.decode("utf-8"))
+ handshake_decoder.process_line(line.decode("utf-8").rstrip())
try:
data = output_queue.get_nowait()
for key, value in data.items():
@@ -73,7 +73,7 @@ def scan_opened_file(
separator = start_string
for line in file:
- linestr = line.decode("utf-8")
+ linestr = line.decode("utf-8").rstrip()
client_decoder.process_line(linestr)
server_decoder.process_line(linestr)
try:
@@ -148,13 +148,19 @@ def scan_file_sequence(file, handshake, messages, thread_id):
while True:
if last_chance:
- filename = dirname + os.sep + root + ext
+ if len(dirname) > 0:
+ filename = dirname + os.sep + root + ext
+ else:
+ filename = root + ext
else:
- filename = dirname + os.sep + root + "-" + str(roll_index) + ext
+ if len(dirname) > 0:
+ filename = dirname + os.sep + root + "-" + str(roll_index) + ext
+ else:
+ filename = root + "-" + str(roll_index) + ext
try:
- f = open(filename, "rb")
print("Scanning " + filename, file=sys.stderr)
+ f = open(filename, "rb")
scan_opened_file(
f,
handshake_decoder,
@@ -172,6 +178,8 @@ def scan_file_sequence(file, handshake, messages, thread_id):
if last_chance:
break
except FileNotFoundError:
+ if last_chance:
+ break
last_chance = True
continue
diff --git a/tools/gnmsg/interest_policy.py b/tools/gnmsg/interest_policy.py
new file mode 100644
index 0000000..50fb0e4
--- /dev/null
+++ b/tools/gnmsg/interest_policy.py
@@ -0,0 +1,20 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+interest_policy = {
+ 0: "NONE",
+ 1: "KEYS",
+ 2: "KEYS/VALUES",
+}
diff --git a/tools/gnmsg/interest_type.py b/tools/gnmsg/interest_type.py
new file mode 100644
index 0000000..b7ce768
--- /dev/null
+++ b/tools/gnmsg/interest_type.py
@@ -0,0 +1,22 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+interest_type = {
+ 0: "KEY",
+ 1: "REGULAR_EXPRESSION",
+ 2: "FILTER_CLASS",
+ 3: "OQL_QUERY",
+ 4: "CQ"
+}
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index ba03f40..ca4ce47 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -51,15 +51,15 @@ class ServerMessageDecoder(DecoderBase):
self.threads_connections_ = {}
self.connection_to_tid_expression_ = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::send:\s*\[([\d|a-f|A-F|x|X]+).*sending request to endpoint.*bytes:\s*(.+)"
)
self.trace_header_with_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).+:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*(.+)"
)
self.trace_header_without_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage:\s*received header from endpoint.*bytes:\s*(.+)"
)
self.trace_header_v911_expression_ = re.compile(
@@ -67,19 +67,19 @@ class ServerMessageDecoder(DecoderBase):
)
self.receive_trace_body_expression_ = re.compile(
- ":\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ ":\d+\s+([\d|a-f|A-F|x|X]+)\]\s*TcrConnection::readMessage: received message body from endpoint.*bytes:\s*(.+)"
)
self.security_trace_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrMessage::addSecurityPart\s*\[(0x[\d|a-f|A-F]*).*length\s*=\s*(\d+)\s*,\s*encrypted\s+ID\s*=\s*(.+)"
)
self.response_header_with_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
)
self.response_header_without_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
)
self.chunk_header_with_pointer_expression_ = re.compile(
@@ -87,14 +87,14 @@ class ServerMessageDecoder(DecoderBase):
)
self.chunk_header_without_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=(.+)"
)
self.chunk_bytes_with_pointer_expression_ = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
)
self.chunk_bytes_without_pointer_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*(.+)"
)
def associate_connection_to_tid(self, line):
@@ -381,17 +381,21 @@ class ServerMessageDecoder(DecoderBase):
else:
self.chunk_decoders_[tid] = ChunkedResponseDecoder()
self.chunk_decoders_[tid].add_header(parts[2], parts[4])
+
+ if self.chunk_decoders_[tid].is_complete_message():
+ receive_trace = self.chunk_decoders_[tid].get_decoded_message(
+ parts[0]
+ )
+ receive_trace["tid"] = str(tid)
+ self.output_queue_.put({"message": receive_trace})
+ self.chunk_decoders_[tid].reset()
elif self.get_chunk_header(line, parts):
flags = 0xFF
size = 0
tid = parts[1]
(flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
if tid in self.chunk_decoders_.keys():
- self.chunk_decoders_[tid].add_chunk_header(parts[3], flags)
- elif self.get_chunk_bytes(line, parts):
- tid = parts[1]
- if tid in self.chunk_decoders_.keys():
- self.chunk_decoders_[tid].add_chunk(parts[3])
+ self.chunk_decoders_[tid].add_chunk_header(int(parts[3]), flags)
if self.chunk_decoders_[tid].is_complete_message():
receive_trace = self.chunk_decoders_[tid].get_decoded_message(
parts[0]