You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/01/29 18:26:20 UTC

[geode-native] branch develop updated: GEODE-8871: parse server response messages for PUT and CONTAINS_KEY (#730)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6391584  GEODE-8871: parse server response messages for PUT and CONTAINS_KEY (#730)
6391584 is described below

commit 6391584c195a85714fff62cfa1e0c1d0ea6de6aa
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Jan 29 10:26:11 2021 -0800

    GEODE-8871: parse server response messages for PUT and CONTAINS_KEY (#730)
    
    - parse server response messages for PUT and CONTAINS_KEY
    - Fix bad regex (copy/paste error)
    - Also some formatting fixes
---
 tools/gnmsg/client_message_decoder.py              | 17 +++-
 tools/gnmsg/client_messages.py                     | 17 +++-
 tools/gnmsg/command_line.py                        |  4 +-
 tools/gnmsg/gnmsg.py                               |  4 +-
 .../{numeric_conversion.py => gnmsg_globals.py}    | 24 ++----
 tools/gnmsg/numeric_conversion.py                  |  6 ++
 .../{numeric_conversion.py => protocol_state.py}   | 32 ++++----
 tools/gnmsg/read_values.py                         | 12 ++-
 tools/gnmsg/server_message_decoder.py              | 37 +++++----
 tools/gnmsg/server_messages.py                     | 91 +++++++++++++++++++++-
 10 files changed, 177 insertions(+), 67 deletions(-)

diff --git a/tools/gnmsg/client_message_decoder.py b/tools/gnmsg/client_message_decoder.py
index 4631747..0c1aaa9 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -22,6 +22,7 @@ from client_messages import parse_client_message
 from decoder_base import DecoderBase
 from message_types import message_types
 from numeric_conversion import to_hex_digit
+from gnmsg_globals import global_protocol_state
 
 
 class ClientMessageDecoder(DecoderBase):
@@ -84,12 +85,13 @@ class ClientMessageDecoder(DecoderBase):
             "PERIODIC_ACK",
             "PING",
             "REQUEST_EVENT_VALUE",
-            "ROLLBACK"
+            "ROLLBACK",
             "SIZE",
             "TX_FAILOVER",
             "TX_SYNCHRONIZATION",
             "USER_CREDENTIAL_MESSAGE",
         ]
+
     def search_for_version(self, line):
         if self.nc_version_ == None:
             expression = re.compile(r"Product version:.*Native (\d+)\.(\d+)\.(\d+)-")
@@ -218,7 +220,12 @@ class ClientMessageDecoder(DecoderBase):
 
         parts = []
         if self.get_send_trace_parts(line, parts):
-            send_trace["Timestamp"], send_trace["tid"], send_trace["Connection"], message_bytes = parts
+            (
+                send_trace["Timestamp"],
+                send_trace["tid"],
+                send_trace["Connection"],
+                message_bytes,
+            ) = parts
             is_send_trace = True
         elif self.get_add_security_trace_parts(line, parts):
             timestamp, tid, connection, security_footer_length, message_bytes = parts
@@ -250,6 +257,9 @@ class ClientMessageDecoder(DecoderBase):
 
                 parse_client_message(send_trace, message_bytes)
                 self.output_queue_.put({"message": send_trace})
+                global_protocol_state.set_last_client_message(
+                    send_trace["tid"], send_trace["Type"]
+                )
         elif self.connection_states_[connection] == self.STATE_FOUND_SECURITY_FOOTER_:
             if is_send_trace:
                 send_trace["Direction"] = "--->"
@@ -261,3 +271,6 @@ class ClientMessageDecoder(DecoderBase):
                     send_trace["SecurityFlag"],
                 ) = self.parse_request_fields(message_bytes)
                 self.output_queue_.put({"message": send_trace})
+                global_protocol_state.set_last_client_message(
+                    send_trace["tid"], send_trace["Type"]
+                )
diff --git a/tools/gnmsg/client_messages.py b/tools/gnmsg/client_messages.py
index a6f03ae..c4bb65c 100644
--- a/tools/gnmsg/client_messages.py
+++ b/tools/gnmsg/client_messages.py
@@ -382,20 +382,27 @@ def read_execute_function_message(properties, message_bytes, offset):
     (properties["FunctionName"], offset) = parse_region_part(message_bytes, offset)
     (properties["Arguments"], offset) = parse_object_part(message_bytes, offset)
 
+
 def parse_getall_optional_callback_arguments(message_bytes, offset):
     (local_object, local_offset) = parse_object_part(message_bytes, offset)
-    if (local_object["IsObject"] == 0):
+    if local_object["IsObject"] == 0:
         (local_object, local_offset) = parse_raw_int_part(message_bytes, offset)
     return (local_object, local_offset)
 
+
 def read_get_all_70_message(properties, message_bytes, offset):
     (properties["Region"], offset) = parse_region_part(message_bytes, offset)
     (properties["KeyList"], offset) = parse_key_or_value(message_bytes, offset)
-    (properties["CallbackArguments"], offset) = parse_getall_optional_callback_arguments(message_bytes, offset)
+    (
+        properties["CallbackArguments"],
+        offset,
+    ) = parse_getall_optional_callback_arguments(message_bytes, offset)
+
 
 def read_key_set(properties, message_bytes, offset):
     (properties["Region"], offset) = parse_region_part(message_bytes, offset)
 
+
 client_message_parsers = {
     "PUT": read_put_message,
     "REQUEST": read_request_message,
@@ -422,7 +429,9 @@ def parse_client_message(properties, message_bytes):
     offset = CHARS_IN_MESSAGE_HEADER
     if properties["Type"] in client_message_parsers.keys():
         try:
-            client_message_parsers[properties["Type"]](properties, message_bytes, offset)
+            client_message_parsers[properties["Type"]](
+                properties, message_bytes, offset
+            )
         except:
             properties["ERROR"] = "Exception reading message - probably incomplete"
-            return
\ No newline at end of file
+            return
diff --git a/tools/gnmsg/command_line.py b/tools/gnmsg/command_line.py
index aea92d3..e19a988 100755
--- a/tools/gnmsg/command_line.py
+++ b/tools/gnmsg/command_line.py
@@ -37,7 +37,9 @@ def parse_command_line():
         help="(optionally) print out regular message details",
     )
 
-    parser.add_argument("--thread_id", metavar="T", nargs="?", help="Show only messages on this thread")
+    parser.add_argument(
+        "--thread_id", metavar="T", nargs="?", help="Show only messages on this thread"
+    )
 
     args = parser.parse_args()
 
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index fd27b83..e14499c 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -75,7 +75,9 @@ def scan_file(filename, dump_handshake, dump_messages, thread_id):
                     if key == "message" and dump_messages:
                         if thread_id:
                             if "tid" in value.keys() and value["tid"] == thread_id:
-                                print(separator + json.dumps(value, indent=2, default=str))
+                                print(
+                                    separator + json.dumps(value, indent=2, default=str)
+                                )
                                 separator = ","
                         else:
                             print(separator + json.dumps(value, indent=2, default=str))
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/gnmsg_globals.py
similarity index 76%
copy from tools/gnmsg/numeric_conversion.py
copy to tools/gnmsg/gnmsg_globals.py
index 97472a5..0298ff3 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/gnmsg_globals.py
@@ -13,21 +13,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-to_hex_digit = {
-    0: "0",
-    1: "1",
-    2: "2",
-    3: "3",
-    4: "4",
-    5: "5",
-    6: "6",
-    7: "7",
-    8: "8",
-    9: "9",
-    10: "a",
-    11: "b",
-    12: "c",
-    13: "d",
-    14: "e",
-    15: "f",
-}
+import protocol_state
+
+
+# global protocol state.  We need to keep track of (at least) the last client
+# message sent, per thread, in order to decode server responses.
+global_protocol_state = protocol_state.ProtocolState()
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/numeric_conversion.py
index 97472a5..da6c959 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/numeric_conversion.py
@@ -31,3 +31,9 @@ to_hex_digit = {
     14: "e",
     15: "f",
 }
+
+
+def decimal_string_to_hex_string(byte):
+    high_nibble = int(int(byte) / 16)
+    low_nibble = int(byte) % 16
+    return to_hex_digit[high_nibble] + to_hex_digit[low_nibble]
diff --git a/tools/gnmsg/numeric_conversion.py b/tools/gnmsg/protocol_state.py
similarity index 62%
copy from tools/gnmsg/numeric_conversion.py
copy to tools/gnmsg/protocol_state.py
index 97472a5..7465644 100644
--- a/tools/gnmsg/numeric_conversion.py
+++ b/tools/gnmsg/protocol_state.py
@@ -13,21 +13,17 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-to_hex_digit = {
-    0: "0",
-    1: "1",
-    2: "2",
-    3: "3",
-    4: "4",
-    5: "5",
-    6: "6",
-    7: "7",
-    8: "8",
-    9: "9",
-    10: "a",
-    11: "b",
-    12: "c",
-    13: "d",
-    14: "e",
-    15: "f",
-}
+
+from message_types import message_types
+from read_values import read_int_value, read_byte_value, call_reader_function
+
+
+class ProtocolState:
+    def __init__(self):
+        self.last_client_message_ = {}
+
+    def get_last_client_message(self, thread_id):
+        return self.last_client_message_[thread_id]
+
+    def set_last_client_message(self, thread_id, client_message):
+        self.last_client_message_[thread_id] = client_message
diff --git a/tools/gnmsg/read_values.py b/tools/gnmsg/read_values.py
index c3bb7f7..2c9fb20 100644
--- a/tools/gnmsg/read_values.py
+++ b/tools/gnmsg/read_values.py
@@ -21,13 +21,23 @@ def read_number_from_hex_string(string, offset, size):
     bits = size * 4
     if value & (1 << (bits - 1)):
         value -= 1 << bits
-    return (value, size)
+    return value, size
+
+
+def read_unsigned_number_from_hex_string(string, offset, size):
+    value = int(string[offset : offset + size], 16)
+    bits = size * 4
+    return value, size
 
 
 def read_byte_value(string, offset):
     return read_number_from_hex_string(string, offset, 2)
 
 
+def read_unsigned_byte_value(string, offset):
+    return read_unsigned_number_from_hex_string(string, offset, 2)
+
+
 def read_short_value(string, offset):
     return read_number_from_hex_string(string, offset, 4)
 
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index 46ca955..c14e7db 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -22,9 +22,11 @@ from dateutil import parser
 from server_messages import parse_server_message
 from decoder_base import DecoderBase
 from message_types import message_types
-from numeric_conversion import to_hex_digit
+from numeric_conversion import to_hex_digit, decimal_string_to_hex_string
 from chunked_message_decoder import ChunkedResponseDecoder
 from read_values import read_number_from_hex_string
+from gnmsg_globals import global_protocol_state
+
 
 class ServerMessageDecoder(DecoderBase):
     def __init__(self, output_queue):
@@ -70,9 +72,9 @@ class ServerMessageDecoder(DecoderBase):
                 minor = match.group(2)
                 patch = match.group(3)
                 self.nc_version_ = major + "." + minor + "." + patch
-                self.receive_trace_parts_retriever_ = self.get_receive_trace_parts_functions_[
-                    self.nc_version_
-                ]
+                self.receive_trace_parts_retriever_ = (
+                    self.get_receive_trace_parts_functions_[self.nc_version_]
+                )
                 self.receive_trace_parser_ = self.receive_trace_parsers_[
                     self.nc_version_
                 ]
@@ -90,10 +92,11 @@ class ServerMessageDecoder(DecoderBase):
             result = True
 
         return result
+
     def get_receive_trace_header_with_pointer(self, line, parts):
         result = False
         expression = re.compile(
-            r"(\d\d:\d\d:\d\d\.\d+).*:\d+\s+(\d+)\]\s*TcrConnection::readMessage:\s*\[([\d|a-f|A-F|x|X]+).*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
+            r"(\d\d:\d\d:\d\d\.\d+).+:\d+\s+(\d+)\]\s*TcrConnection::readMessage\(([\d|a-f|A-F|x|X]+)\):.*received header from endpoint.*bytes:\s*([\d|a-f|A-F]+)"
         )
         match = expression.search(line)
         if match:
@@ -192,7 +195,6 @@ class ServerMessageDecoder(DecoderBase):
 
         return result
 
-
     def get_response_header(self, line, parts):
         # Check if this is a header for a chunked message
         result = False
@@ -293,17 +295,12 @@ class ServerMessageDecoder(DecoderBase):
 
         return result
 
-    def decimal_string_to_hex_string(self, byte):
-        high_nibble = int(int(byte) / 16)
-        low_nibble = int(byte) % 16
-        return to_hex_digit[high_nibble] + to_hex_digit[low_nibble]
-
     def format_bytes_as_hex_v911(self, message_bytes):
         byte_list = message_bytes.split(" ")
         hex_string = ""
         for byte in byte_list:
             if byte:
-                hex_string += self.decimal_string_to_hex_string(byte)
+                hex_string += decimal_string_to_hex_string(byte)
         return hex_string
 
     def parse_response_fields_base(self, message_bytes):
@@ -357,13 +354,14 @@ class ServerMessageDecoder(DecoderBase):
             pass
         elif self.get_receive_trace_parts(line, parts):
             tid = parts[1]
-            last_header = {"Timestamp": parts[0],
-                           "tid": tid,
-                           "Connection": parts[2]}
+            last_header = {"Timestamp": parts[0], "tid": tid, "Connection": parts[2]}
             message_bytes = parts[3]
             self.headers_[tid] = last_header
             connection = parts[2]
-            if connection in self.connection_states_.keys() and self.connection_states_[connection] != self.STATE_NEUTRAL_:
+            if (
+                connection in self.connection_states_.keys()
+                and self.connection_states_[connection] != self.STATE_NEUTRAL_
+            ):
                 print("WARNING: Multiple headers rec'd without a message body.")
             self.connection_states_[connection] = self.STATE_NEUTRAL_
         elif self.get_receive_trace_body_parts(line, parts):
@@ -378,7 +376,7 @@ class ServerMessageDecoder(DecoderBase):
             connection = parts[2]
             self.chunk_decoder.add_header(parts[2], parts[4])
         elif self.get_chunk_header(line, parts):
-            flags = 0xff
+            flags = 0xFF
             size = 0
             tid = parts[1]
             (flags, size) = read_number_from_hex_string(parts[4], 2, len(parts[4]) - 2)
@@ -387,7 +385,9 @@ class ServerMessageDecoder(DecoderBase):
             tid = parts[1]
             self.chunk_decoder.add_chunk(parts[3])
             if self.chunk_decoder.is_complete_message():
-                self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
+                self.output_queue_.put(
+                    {"message": self.chunk_decoder.get_decoded_message()}
+                )
                 self.chunk_decoder.reset()
         else:
             return
@@ -421,4 +421,3 @@ class ServerMessageDecoder(DecoderBase):
                 parse_server_message(receive_trace, message_body)
                 self.connection_states_[connection] = self.STATE_NEUTRAL_
                 self.output_queue_.put({"message": receive_trace})
-
diff --git a/tools/gnmsg/server_messages.py b/tools/gnmsg/server_messages.py
index b0a4758..4aa999a 100644
--- a/tools/gnmsg/server_messages.py
+++ b/tools/gnmsg/server_messages.py
@@ -16,10 +16,13 @@
 from read_values import (
     call_reader_function,
     read_int_value,
+    read_unsigned_byte_value,
     read_byte_value,
     read_cacheable,
     parse_key_or_value,
 )
+from numeric_conversion import decimal_string_to_hex_string
+from gnmsg_globals import global_protocol_state
 
 
 def read_bucket_count(message_bytes, offset):
@@ -32,18 +35,22 @@ def read_bucket_count(message_bytes, offset):
     )
     (object_part["Data"], offset) = read_cacheable(message_bytes, offset)
 
-    return (object_part, offset)
+    return object_part, offset
 
 
 def read_partition_attributes(properties, message_bytes, offset):
     (properties["BucketCount"], offset) = read_bucket_count(message_bytes, offset)
     (properties["ColocatedWith"], offset) = parse_key_or_value(message_bytes, offset)
     if properties["Parts"] == 4:
-        (properties["PartitionResolverName"], offset) = parse_key_or_value(message_bytes, offset)
+        (properties["PartitionResolverName"], offset) = parse_key_or_value(
+            message_bytes, offset
+        )
         # TODO: parse part 4 (list of partition attributes)
     elif properties["Parts"] == 3:
         try:
-            (properties["PartitionResolverName"], offset) = parse_key_or_value(message_bytes, offset)
+            (properties["PartitionResolverName"], offset) = parse_key_or_value(
+                message_bytes, offset
+            )
         except:
             raise Exception(
                 "Don't know how to parse a RESPONSE_CLIENT_PARTITION_ATTRIBUTES message with "
@@ -52,8 +59,78 @@ def read_partition_attributes(properties, message_bytes, offset):
         # TODO: parse part 3 if it is not partition resolver but list of partition attributes
 
 
+def read_object_header(message_bytes, offset):
+    object_base = {}
+    (object_base["Size"], offset) = call_reader_function(
+        message_bytes, offset, read_int_value
+    )
+    (object_base["IsObject"], offset) = call_reader_function(
+        message_bytes, offset, read_byte_value
+    )
+    return object_base, offset
+
+
+def read_bytes_part(message_bytes, offset):
+    bytes_part, offset = read_object_header(message_bytes, offset)
+
+    bytes_string = ""
+    for i in range(bytes_part["Size"]):
+        if i:
+            bytes_string += " "
+        byte_val, offset = call_reader_function(message_bytes, offset, read_byte_value)
+        bytes_string += decimal_string_to_hex_string(str(byte_val))
+    bytes_part["Bytes"] = bytes_string
+    return bytes_part, offset
+
+
+def read_int_part(message_bytes, offset):
+    int_part, offset = read_object_header(message_bytes, offset)
+
+    int_value, offset = call_reader_function(message_bytes, offset, read_int_value)
+    int_part["Value"] = decimal_string_to_hex_string(str(int_value))
+    return int_part, offset
+
+
+def read_old_value_part(message_bytes, offset):
+    old_value_part, offset = read_object_header(message_bytes, offset)
+
+    bytes_string = ""
+    for i in range(old_value_part["Size"]):
+        if i:
+            bytes_string += " "
+        byte_val, offset = call_reader_function(
+            message_bytes, offset, read_unsigned_byte_value
+        )
+        bytes_string += decimal_string_to_hex_string(str(byte_val))
+    old_value_part["Bytes"] = bytes_string
+    return old_value_part, offset
+
+
+def read_object_part(message_bytes, offset):
+    object_part, offset = read_object_header(message_bytes, offset)
+
+    offset += 2 * object_part["Size"]
+    return object_part, offset
+
+
+def read_put_reply(properties, message_bytes, offset):
+    (properties["Bytes"], offset) = read_bytes_part(message_bytes, offset)
+    (properties["Flags"], offset) = read_int_part(message_bytes, offset)
+    if properties["Parts"] >= 3:
+        (properties["OldValue"], offset) = read_old_value_part(message_bytes, offset)
+        if properties["Parts"] == 4:
+            (properties["VersionTag"], offset) = read_object_part(message_bytes, offset)
+
+
+def read_contains_key_response(properties, message_bytes, offset):
+    properties["Response"] = parse_key_or_value(message_bytes, offset)
+    return properties, offset
+
+
 server_message_parsers = {
     "RESPONSE_CLIENT_PARTITION_ATTRIBUTES": read_partition_attributes,
+    "PUT_REPLY": read_put_reply,
+    "CONTAINS_KEY_RESPONSE": read_contains_key_response,
 }
 
 
@@ -61,3 +138,11 @@ def parse_server_message(properties, message_bytes):
     offset = 0
     if properties["Type"] in server_message_parsers.keys():
         server_message_parsers[properties["Type"]](properties, message_bytes, offset)
+    else:
+        key = (
+            global_protocol_state.get_last_client_message(properties["tid"])
+            + "_"
+            + properties["Type"]
+        )
+        if key in server_message_parsers.keys():
+            server_message_parsers[key](properties, message_bytes, offset)