You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2020/12/18 18:48:28 UTC

[geode-native] branch develop updated: GEODE-8532: parse chunked responses in gnmsg tool (#702)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8b0a5dc  GEODE-8532: parse chunked responses in gnmsg tool (#702)
8b0a5dc is described below

commit 8b0a5dc4294316241d6cbfce62a03dad10e1d18c
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Dec 18 10:48:17 2020 -0800

    GEODE-8532: parse chunked responses in gnmsg tool (#702)
    
    - basic chunk handling works
    - output is just message type and chunk length for each chunk
    - Need to handle chunked responses coming back from simultaneous connections
    - Conceivably these can come back out-of-order, so logging the 'this' pointer
    for the connection allows parsing code to disambiguate
---
 cppcache/src/TcrConnection.cpp         |  18 +++---
 tools/gnmsg/chunked_message_decoder.py |  81 +++++++++++++++++++++++++
 tools/gnmsg/server_message_decoder.py  | 104 ++++++++++++++++++++++++++++++++-
 3 files changed, 193 insertions(+), 10 deletions(-)

diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 602c084..40d1238 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -678,7 +678,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
   }
 
   LOGDEBUG(
-      "TcrConnection::readMessage: [%p] received header from endpoint %s; "
+      "TcrConnection::readMessage(%p): received header from endpoint %s; "
       "bytes: %s",
       this, m_endpointObj->name().c_str(),
       Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str());
@@ -815,9 +815,9 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
   }
 
   LOGDEBUG(
-      "TcrConnection::readResponseHeader: received header from "
+      "TcrConnection::readResponseHeader(%p): received header from "
       "endpoint %s; bytes: %s",
-      m_endpointObj->name().c_str(),
+      this, m_endpointObj->name().c_str(),
       Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
 
   auto input = m_connectionManager.getCacheImpl()->createDataInput(
@@ -828,11 +828,11 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
   header.header.chunkLength = input.readInt32();
   header.header.flags = input.read();
   LOGDEBUG(
-      "TcrConnection::readResponseHeader: "
+      "TcrConnection::readResponseHeader(%p): "
       "messageType=%" PRId32 ", numberOfParts=%" PRId32
       ", transactionId=%" PRId32 ", chunkLength=%" PRId32
       ", lastChunkAndSecurityFlags=0x%" PRIx8,
-      header.messageType, header.numberOfParts, header.transactionId,
+      this, header.messageType, header.numberOfParts, header.transactionId,
       header.header.chunkLength, header.header.flags);
 
   return header;
@@ -867,9 +867,9 @@ chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
   header.chunkLength = input.readInt32();
   header.flags = input.read();
   LOGDEBUG(
-      "TcrConnection::readChunkHeader: "
+      "TcrConnection::readChunkHeader(%p): "
       ", chunkLen=%" PRId32 ", lastChunkAndSecurityFlags=0x%" PRIx8,
-      header.chunkLength, header.flags);
+      this, header.chunkLength, header.flags);
 
   return header;
 }
@@ -892,9 +892,9 @@ std::vector<uint8_t> TcrConnection::readChunkBody(
   }
 
   LOGDEBUG(
-      "TcrConnection::readChunkBody: received chunk body from endpoint "
+      "TcrConnection::readChunkBody(%p): received chunk body from endpoint "
       "%s; bytes: %s",
-      m_endpointObj->name().c_str(),
+      this, m_endpointObj->name().c_str(),
       Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
   return chunkBody;
 }
diff --git a/tools/gnmsg/chunked_message_decoder.py b/tools/gnmsg/chunked_message_decoder.py
new file mode 100644
index 0000000..e2be4b3
--- /dev/null
+++ b/tools/gnmsg/chunked_message_decoder.py
@@ -0,0 +1,81 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+import struct
+
+from message_types import message_types
+from read_values import read_int_value, read_byte_value, call_reader_function
+
+CHUNKED_MESSAGE_HEADER_LENGTH = 17
+
+class ChunkedResponseDecoder:
+    def __init__(self):
+        self.reset()
+
+    def add_header(self, connection, header):
+        if len(self.chunked_message) > 0:
+            raise Exception("Previous chunked message is not completed, can't process another header")
+
+        if len(header) == 2 * CHUNKED_MESSAGE_HEADER_LENGTH:
+            offset = 0
+            message_type = ""
+            (message_type, offset) = call_reader_function(header, offset, read_int_value)
+            self.chunked_message["Type"] = message_types[message_type]
+            # TODO: pass connection value in as a parameter
+            self.chunked_message["Connection"] = connection
+            self.chunked_message["Direction"] = "<---"
+            (self.chunked_message["Parts"], offset) = call_reader_function(header, offset, read_int_value)
+            (self.chunked_message["TransactionId"], offset) = call_reader_function(header, offset, read_int_value)
+            chunk_size = 0
+            flags = 0
+            (chunk_size, offset) = call_reader_function(header, offset, read_int_value)
+            (flags, offset) = call_reader_function(header, offset, read_byte_value)
+            self.chunked_message["ChunkInfo"] = []
+            self.add_chunk_header(chunk_size, flags)
+        else:
+            raise IndexError("Chunked message header should be " + str(CHUNKED_MESSAGE_HEADER_LENGTH) + " bytes")
+
+    def add_chunk_header(self, chunk_size, flags):
+        self.chunk_index += 1
+        if len(self.chunked_message) == 0:
+            raise Exception("Can't add chunk header before message header")
+
+        key = "Chunk" + str(self.chunk_index)
+        inner_item = dict(ChunkLength=int(chunk_size), Flags=flags)
+        outer_item = {}
+        outer_item[key] = inner_item
+        self.chunked_message["ChunkInfo"].append(outer_item)
+        self.chunk_flags = flags
+
+    def add_chunk(self, chunk):
+        if len(self.chunked_message) == 0:
+            raise Exception("Can't add chunks before message header")
+
+        self.message_body += chunk
+
+    def is_complete_message(self):
+        return self.chunk_flags & 0x1
+
+    def get_decoded_message(self):
+        return self.chunked_message
+
+    def reset(self):
+        self.header = ""
+        self.message_body = ""
+        self.chunked_message = {}
+        self.complete = False
+        self.chunk_flags = 0xff
+        self.chunk_index = -1
\ No newline at end of file
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index 5c3eab5..b6fcc96 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -22,7 +22,8 @@ from server_messages import parse_server_message
 from decoder_base import DecoderBase
 from message_types import message_types
 from numeric_conversion import to_hex_digit
-
+from chunked_message_decoder import ChunkedResponseDecoder
+from read_values import read_number_from_hex_string
 
 class ServerMessageDecoder(DecoderBase):
     def __init__(self, output_queue):
@@ -50,6 +51,7 @@ class ServerMessageDecoder(DecoderBase):
             "10.1.3": self.parse_response_fields_base,
             "9.1.1": self.parse_response_fields_v911,
         }
+        self.chunk_decoder = ChunkedResponseDecoder()
 
     def search_for_version(self, line):
         if self.nc_version_ == None:
@@ -148,6 +150,93 @@ class ServerMessageDecoder(DecoderBase):
 
         return result
 
+
+    def get_response_header(self, line, parts):
+        # Check if this is a header for a chunked message
+        result = False
+
+        expression = re.compile(
+            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+        )
+        match = expression.search(line)
+        if match:
+            parts.append(parser.parse(match.group(1)))
+            parts.append(match.group(2))
+            parts.append(match.group(3))
+            parts.append(match.group(4))
+            result = True
+
+        if not result:
+            expression = re.compile(
+                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            )
+            match = expression.search(line)
+            if match:
+                parts.append(parser.parse(match.group(1)))
+                parts.append("")
+                parts.append(match.group(2))
+                parts.append(match.group(3))
+                result = True
+
+        return result
+
+    def get_chunk_header(self, line, parts):
+        # Check if this is a header for a chunked message
+        result = False
+        expression = re.compile(
+            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+        )
+        match = expression.search(line)
+        if match:
+            parts.append(parser.parse(match.group(1)))
+            parts.append(match.group(2))
+            parts.append(match.group(3))
+            parts.append(match.group(4))
+            result = True
+
+        if not result:
+            expression = re.compile(
+                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+            )
+            match = expression.search(line)
+            if match:
+                parts.append(parser.parse(match.group(1)))
+                parts.append("")
+                parts.append(match.group(2))
+                parts.append(match.group(3))
+                result = True
+
+        return result
+
+    def get_chunk_bytes(self, line, parts):
+        # Check if this is a message chunk.
+        # If it is, add it to the chunked decoder
+        result = False
+        expression = re.compile(
+            r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+        )
+        match = expression.search(line)
+        if match:
+            parts.append(parser.parse(match.group(1)))
+            parts.append(match.group(2))
+            parts.append(match.group(3))
+            parts.append(match.group(4))
+            result = True
+
+        if not result:
+            expression = re.compile(
+                r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+            )
+            match = expression.search(line)
+            if match:
+                parts.append(parser.parse(match.group(1)))
+                parts.append("")
+                parts.append(match.group(2))
+                parts.append(match.group(3))
+                result = True
+
+        return result
+
     def decimal_string_to_hex_string(self, byte):
         high_nibble = int(int(byte) / 16)
         low_nibble = int(byte) % 16
@@ -202,6 +291,7 @@ class ServerMessageDecoder(DecoderBase):
         connection = None
         message_bytes = None
         message_body = None
+        chunk_bytes = None
 
         self.search_for_version(line)
 
@@ -216,6 +306,18 @@ class ServerMessageDecoder(DecoderBase):
             message_body = parts[0]
         elif self.get_add_security_trace_parts(line, parts):
             connection = parts[1]
+        elif self.get_response_header(line, parts):
+            self.chunk_decoder.add_header(parts[1], parts[3])
+        elif self.get_chunk_header(line, parts):
+            flags = 0xff
+            size = 0
+            (flags, size) = read_number_from_hex_string(parts[3], 2, len(parts[3]) - 2)
+            self.chunk_decoder.add_chunk_header(parts[2], flags)
+        elif self.get_chunk_bytes(line, parts):
+            self.chunk_decoder.add_chunk(parts[3])
+            if self.chunk_decoder.is_complete_message():
+                self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
+                self.chunk_decoder.reset()
         else:
             return