You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/01/29 18:26:20 UTC
[geode-native] branch develop updated: GEODE-8871: parse server
response messages for PUT and CONTAINS_KEY (#730)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 6391584 GEODE-8871: parse server response messages for PUT and CONTAINS_KEY (#730)
6391584 is described below
commit 6391584c195a85714fff62cfa1e0c1d0ea6de6aa
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Jan 29 10:26:11 2021 -0800
GEODE-8871: parse server response messages for PUT and CONTAINS_KEY (#730)
- parse server response messages for PUT and CONTAINS_KEY
- Fix bad regex (copy/paste error)
- Also some formatting fixes
---
tools/gnmsg/client_message_decoder.py | 17 +++-
tools/gnmsg/client_messages.py | 17 +++-
tools/gnmsg/command_line.py | 4 +-
tools/gnmsg/gnmsg.py | 4 +-
.../{numeric_conversion.py => gnmsg_globals.py} | 24 ++----
tools/gnmsg/numeric_conversion.py | 6 ++
.../{numeric_conversion.py => protocol_state.py} | 32 ++++----
tools/gnmsg/read_values.py | 12 ++-
tools/gnmsg/server_message_decoder.py | 37 +++++----
tools/gnmsg/server_messages.py | 91 +++++++++++++++++++++-
10 files changed, 177 insertions(+), 67 deletions(-)
diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index 4631747..0c1aaa9 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -22,6 +22,7 @@ from client_messages import parse_client_message
from decoder_base import DecoderBase
from message_types import message_types
from numeric_conversion import to_hex_digit
+from gnmsg_globals import global_protocol_state
class ClientMessageDecoder(DecoderBase):
@@ -84,12 +85,13 @@ class ClientMessageDecoder(DecoderBase):
"PERIODIC_ACK",
"PING",
"REQUEST_EVENT_VALUE",
- "ROLLBACK"
+ "ROLLBACK",
"SIZE",
"TX_FAILOVER",
"TX_SYNCHRONIZATION",
"USER_CREDENTIAL_MESSAGE",
]
+
def search_for_version(self, line):
if self.nc_version_ == None:
expression = re.compile(r"Product version:.*Native (\d+)\.(\d+)\.(\d+)-")
@@ -218,7 +220,12 @@ class ClientMessageDecoder(DecoderBase):
parts = []
if self.get_send_trace_parts(line, parts):
- send_trace["Timestamp"], send_trace["tid"], send_trace["Connection"], message_bytes = parts
+ (
+ send_trace["Timestamp"],
+ send_trace["tid"],
+ send_trace["Connection"],
+ message_bytes,
+ ) = parts
is_send_trace = True
elif self.get_add_security_trace_parts(line, parts):
timestamp, tid, connection, security_footer_length, message_bytes = parts
@@ -250,6 +257,9 @@ class ClientMessageDecoder(DecoderBase):
parse_client_message(send_trace, message_bytes)
self.output_queue_.put({"message": send_trace})
+ global_protocol_state.set_last_client_message(
+ send_trace["tid"], send_trace["Type"]
+ )
elif self.connection_states_[connection] == self.STATE_FOUND_SECURITY_FOOTER_:
if is_send_trace:
send_trace["Direction"] = "--->"
@@ -261,3 +271,6 @@ class ClientMessageDecoder(DecoderBase):
send_trace["SecurityFlag"],
) = self.parse_request_fields(message_bytes)
self.output_queue_.put({"message": send_trace})
+ global_protocol_state.set_last_client_message(
+ send_trace["tid"], send_trace["Type"]
+ )
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index a6f03ae..c4bb65c 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -382,20 +382,27 @@ def read_execute_function_message(properties, message_bytes, offset):
(properties["FunctionName"], offset) = parse_region_part(message_bytes, offset)
(properties["Arguments"], offset) = parse_object_part(message_bytes, offset)
+
def parse_getall_optional_callback_arguments(message_bytes, offset):
(local_object, local_offset) = parse_object_part(message_bytes, offset)
- if (local_object["IsObject"] == 0):
+ if local_object["IsObject"] == 0:
(local_object, local_offset) = parse_raw_int_part(message_bytes, offset)
return (local_object, local_offset)
+
def read_get_all_70_message(properties, message_bytes, offset):
(properties["Region"], offset) = parse_region_part(message_bytes, offset)
(properties["KeyList"], offset) = parse_key_or_value(message_bytes, offset)
- (properties["CallbackArguments"], offset) = parse_getall_optional_callback_arguments(message_bytes, offset)
+ (
+ properties["CallbackArguments"],
+ offset,
+ ) = parse_getall_optional_callback_arguments(message_bytes, offset)
+
def read_key_set(properties, message_bytes, offset):
(properties["Region"], offset) = parse_region_part(message_bytes, offset)
+
client_message_parsers = {
"PUT": read_put_message,
"REQUEST": read_request_message,
@@ -422,7 +429,9 @@ def parse_client_message(properties, message_bytes):
offset = CHARS_IN_MESSAGE_HEADER
if properties["Type"] in client_message_parsers.keys():
try:
- client_message_parsers[properties["Type"]](properties, message_bytes, offset)
+ client_message_parsers[properties["Type"]](
+ properties, message_bytes, offset
+ )
except:
properties["ERROR"] = "Exception reading message - probably incomplete"
- return
\ No newline at end of file
+ return
diff --git a/tools/gnmsg/command_line.py b/tools/gnmsg/command_line.py
index aea92d3..e19a988 100755
--- a/tools/gnmsg/command_line.py
+++ b/tools/gnmsg/command_line.py
@@ -37,7 +37,9 @@ def parse_command_line():
help="(optionally) print out regular message details",
)
- parser.add_argument("--thread_id", metavar="T", nargs="?", help="Show only messages on this thread")
+ parser.add_argument(
+ "--thread_id", metavar="T", nargs="?", help="Show only messages on this thread"
+ )
args = parser.parse_args()
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index fd27b83..e14499c 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -75,7 +75,9 @@ def scan_file(filename, dump_handshake, dump_messages, thread_id):
if key == "message" and dump_messages:
if thread_id:
if "tid" in value.keys() and value["tid"] == thread_id:
- print(separator + json.dumps(value, indent=2, default=str))
+ print(
+ separator + json.dumps(value, indent=2, default=str)
+ )
separator = ","
else:
print(separator + json.dumps(value, indent=2, default=str))
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/gnmsg_globals.py
similarity index 76%
copy from tools/gnmsg/numeric_conversion.py
copy to tools/gnmsg/gnmsg_globals.py
index 97472a5..0298ff3 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/gnmsg_globals.py
@@ -13,21 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-to_hex_digit = {
- 0: "0",
- 1: "1",
- 2: "2",
- 3: "3",
- 4: "4",
- 5: "5",
- 6: "6",
- 7: "7",
- 8: "8",
- 9: "9",
- 10: "a",
- 11: "b",
- 12: "c",
- 13: "d",
- 14: "e",
- 15: "f",
-}
+import protocol_state
+
+
+# global protocol state. We need to keep track of (at least) the last client
+# message sent, per thread, in order to decode server responses.
+global_protocol_state = protocol_state.ProtocolState()
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/numeric_conversion.py
index 97472a5..da6c959 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/numeric_conversion.py
@@ -31,3 +31,9 @@ to_hex_digit = {
14: "e",
15: "f",
}
+
+
+def decimal_string_to_hex_string(byte):
+ high_nibble = int(int(byte) / 16)
+ low_nibble = int(byte) % 16
+ return to_hex_digit[high_nibble] + to_hex_digit[low_nibble]
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/protocol_state.py
similarity index 62%
copy from tools/gnmsg/numeric_conversion.py
copy to tools/gnmsg/protocol_state.py
index 97472a5..7465644 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/protocol_state.py
@@ -13,21 +13,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-to_hex_digit = {
- 0: "0",
- 1: "1",
- 2: "2",
- 3: "3",
- 4: "4",
- 5: "5",
- 6: "6",
- 7: "7",
- 8: "8",
- 9: "9",
- 10: "a",
- 11: "b",
- 12: "c",
- 13: "d",
- 14: "e",
- 15: "f",
-}
+
+from message_types import message_types
+from read_values import read_int_value, read_byte_value, call_reader_function
+
+
+class ProtocolState:
+ def __init__(self):
+ self.last_client_message_ = {}
+
+ def get_last_client_message(self, thread_id):
+ return self.last_client_message_[thread_id]
+
+ def set_last_client_message(self, thread_id, client_message):
+ self.last_client_message_[thread_id] = client_message
diff --git a/tools/gnmsg/read_values.py b/tools/gnmsg/read_values.py
index c3bb7f7..2c9fb20 100644
--- a/tools/gnmsg/read_values.py
+++ b/tools/gnmsg/read_values.py
@@ -21,13 +21,23 @@ def read_number_from_hex_string(string, offset, size):
bits = size * 4
if value & (1 << (bits - 1)):
value -= 1 << bits
- return (value, size)
+ return value, size
+
+
+def read_unsigned_number_from_hex_string(string, offset, size):
+ value = int(string[offset : offset + size], 16)
+ bits = size * 4
+ return value, size
def read_byte_value(string, offset):
return read_number_from_hex_string(string, offset, 2)
+def read_unsigned_byte_value(string, offset):
+ return read_unsigned_number_from_hex_string(string, offset, 2)
+
+
def read_short_value(string, offset):
return read_number_from_hex_string(string, offset, 4)
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index 46ca955..c14e7db 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -22,9 +22,11 @@ from dateutil import parser
from server_messages import parse_server_message
from decoder_base import DecoderBase
from message_types import message_types
-from numeric_conversion import to_hex_digit
+from numeric_conversion import to_hex_digit, decimal_string_to_hex_string
from chunked_message_decoder import ChunkedResponseDecoder
from read_values import read_number_from_hex_string
+from gnmsg_globals import global_protocol_state
+
class ServerMessageDecoder(DecoderBase):
def __init__(self, output_queue):
@@ -70,9 +72,9 @@ class ServerMessageDecoder(DecoderBase):
minor = match.group(2)
patch = match.group(3)
self.nc_version_ = major + "." + minor + "." + patch
- self.receive_trace_parts_retriever_ = self.get_receive_trace_parts_functions_[
- self.nc_version_
- ]
+ self.receive_trace_parts_retriever_ = (
+ self.get_receive_trace_parts_functions_[self.nc_version_]
+ )
self.receive_trace_parser_ = self.receive_trace_parsers_[
self.nc_version_
]
@@ -90,10 +92,11 @@ class ServerMessageDecoder(DecoderBase):
result = True
return result
+
def get_receive_trace_header_with_pointer(self, line, parts):
result = False
expression = re.compile(
- r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+ r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
)
match = expression.search(line)
if match:
@@ -192,7 +195,6 @@ class ServerMessageDecoder(DecoderBase):
return result
-
def get_response_header(self, line, parts):
# Check if this is a header for a chunked message
result = False
@@ -293,17 +295,12 @@ class ServerMessageDecoder(DecoderBase):
return result
- def decimal_string_to_hex_string(self, byte):
- high_nibble = int(int(byte) / 16)
- low_nibble = int(byte) % 16
- return to_hex_digit[high_nibble] + to_hex_digit[low_nibble]
-
def format_bytes_as_hex_v911(self, message_bytes):
byte_list = message_bytes.split(" ")
hex_string = ""
for byte in byte_list:
if byte:
- hex_string += self.decimal_string_to_hex_string(byte)
+ hex_string += decimal_string_to_hex_string(byte)
return hex_string
def parse_response_fields_base(self, message_bytes):
@@ -357,13 +354,14 @@ class ServerMessageDecoder(DecoderBase):
pass
elif self.get_receive_trace_parts(line, parts):
tid = parts[1]
- last_header = {"Timestamp": parts[0],
- "tid": tid,
- "Connection": parts[2]}
+ last_header = {"Timestamp": parts[0], "tid": tid, "Connection": parts[2]}
message_bytes = parts[3]
self.headers_[tid] = last_header
connection = parts[2]
- if connection in self.connection_states_.keys() and self.connection_states_[connection] != self.STATE_NEUTRAL_:
+ if (
+ connection in self.connection_states_.keys()
+ and self.connection_states_[connection] != self.STATE_NEUTRAL_
+ ):
print("WARNING: Multiple headers rec'd without a message body.")
self.connection_states_[connection] = self.STATE_NEUTRAL_
elif self.get_receive_trace_body_parts(line, parts):
@@ -378,7 +376,7 @@ class ServerMessageDecoder(DecoderBase):
connection = parts[2]
self.chunk_decoder.add_header(parts[2], parts[4])
elif self.get_chunk_header(line, parts):
- flags = 0xff
+ flags = 0xFF
size = 0
tid = parts[1]
(flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
@@ -387,7 +385,9 @@ class ServerMessageDecoder(DecoderBase):
tid = parts[1]
self.chunk_decoder.add_chunk(parts[3])
if self.chunk_decoder.is_complete_message():
- self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
+ self.output_queue_.put(
+ {"message": self.chunk_decoder.get_decoded_message()}
+ )
self.chunk_decoder.reset()
else:
return
@@ -421,4 +421,3 @@ class ServerMessageDecoder(DecoderBase):
parse_server_message(receive_trace, message_body)
self.connection_states_[connection] = self.STATE_NEUTRAL_
self.output_queue_.put({"message": receive_trace})
-
diff --git a/tools/gnmsg/server_messages.py b/tools/gnmsg/server_messages.py
index b0a4758..4aa999a 100644
--- a/tools/gnmsg/server_messages.py
+++ b/tools/gnmsg/server_messages.py
@@ -16,10 +16,13 @@
from read_values import (
call_reader_function,
read_int_value,
+ read_unsigned_byte_value,
read_byte_value,
read_cacheable,
parse_key_or_value,
)
+from numeric_conversion import decimal_string_to_hex_string
+from gnmsg_globals import global_protocol_state
def read_bucket_count(message_bytes, offset):
@@ -32,18 +35,22 @@ def read_bucket_count(message_bytes, offset):
)
(object_part["Data"], offset) = read_cacheable(message_bytes, offset)
- return (object_part, offset)
+ return object_part, offset
def read_partition_attributes(properties, message_bytes, offset):
(properties["BucketCount"], offset) = read_bucket_count(message_bytes, offset)
(properties["ColocatedWith"], offset) = parse_key_or_value(message_bytes, offset)
if properties["Parts"] == 4:
- (properties["PartitionResolverName"], offset) = parse_key_or_value(message_bytes, offset)
+ (properties["PartitionResolverName"], offset) = parse_key_or_value(
+ message_bytes, offset
+ )
# TODO: parse part 4 (list of partition attributes)
elif properties["Parts"] == 3:
try:
- (properties["PartitionResolverName"], offset) = parse_key_or_value(message_bytes, offset)
+ (properties["PartitionResolverName"], offset) = parse_key_or_value(
+ message_bytes, offset
+ )
except:
raise Exception(
"Don't know how to parse a RESPONSE_CLIENT_PARTITION_ATTRIBUTES message with "
@@ -52,8 +59,78 @@ def read_partition_attributes(properties, message_bytes, offset):
# TODO: parse part 3 if it is not partition resolver but list of partition attributes
+def read_object_header(message_bytes, offset):
+ object_base = {}
+ (object_base["Size"], offset) = call_reader_function(
+ message_bytes, offset, read_int_value
+ )
+ (object_base["IsObject"], offset) = call_reader_function(
+ message_bytes, offset, read_byte_value
+ )
+ return object_base, offset
+
+
+def read_bytes_part(message_bytes, offset):
+ bytes_part, offset = read_object_header(message_bytes, offset)
+
+ bytes_string = ""
+ for i in range(bytes_part["Size"]):
+ if i:
+ bytes_string += " "
+ byte_val, offset = call_reader_function(message_bytes, offset, read_byte_value)
+ bytes_string += decimal_string_to_hex_string(str(byte_val))
+ bytes_part["Bytes"] = bytes_string
+ return bytes_part, offset
+
+
+def read_int_part(message_bytes, offset):
+ int_part, offset = read_object_header(message_bytes, offset)
+
+ int_value, offset = call_reader_function(message_bytes, offset, read_int_value)
+ int_part["Value"] = decimal_string_to_hex_string(str(int_value))
+ return int_part, offset
+
+
+def read_old_value_part(message_bytes, offset):
+ old_value_part, offset = read_object_header(message_bytes, offset)
+
+ bytes_string = ""
+ for i in range(old_value_part["Size"]):
+ if i:
+ bytes_string += " "
+ byte_val, offset = call_reader_function(
+ message_bytes, offset, read_unsigned_byte_value
+ )
+ bytes_string += decimal_string_to_hex_string(str(byte_val))
+ old_value_part["Bytes"] = bytes_string
+ return old_value_part, offset
+
+
+def read_object_part(message_bytes, offset):
+ object_part, offset = read_object_header(message_bytes, offset)
+
+ offset += 2 * object_part["Size"]
+ return object_part, offset
+
+
+def read_put_reply(properties, message_bytes, offset):
+ (properties["Bytes"], offset) = read_bytes_part(message_bytes, offset)
+ (properties["Flags"], offset) = read_int_part(message_bytes, offset)
+ if properties["Parts"] >= 3:
+ (properties["OldValue"], offset) = read_old_value_part(message_bytes, offset)
+ if properties["Parts"] == 4:
+ (properties["VersionTag"], offset) = read_object_part(message_bytes, offset)
+
+
+def read_contains_key_response(properties, message_bytes, offset):
+ properties["Response"] = parse_key_or_value(message_bytes, offset)
+ return properties, offset
+
+
server_message_parsers = {
"RESPONSE_CLIENT_PARTITION_ATTRIBUTES": read_partition_attributes,
+ "PUT_REPLY": read_put_reply,
+ "CONTAINS_KEY_RESPONSE": read_contains_key_response,
}
@@ -61,3 +138,11 @@ def parse_server_message(properties, message_bytes):
offset = 0
if properties["Type"] in server_message_parsers.keys():
server_message_parsers[properties["Type"]](properties, message_bytes, offset)
+ else:
+ key = (
+ global_protocol_state.get_last_client_message(properties["tid"])
+ + "_"
+ + properties["Type"]
+ )
+ if key in server_message_parsers.keys():
+ server_message_parsers[key](properties, message_bytes, offset)