You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2020/12/18 18:48:28 UTC
[geode-native] branch develop updated: GEODE-8532: parse chunked
responses in gnmsg tool (#702)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 8b0a5dc GEODE-8532: parse chunked responses in gnmsg tool (#702)
8b0a5dc is described below
commit 8b0a5dc4294316241d6cbfce62a03dad10e1d18c
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Dec 18 10:48:17 2020 -0800
GEODE-8532: parse chunked responses in gnmsg tool (#702)
- basic chunk handling works
- output is just message type and chunk length for each chunk
- Need to handle chunked responses coming back from simultaneous connections
- Conceivably these can come back out-of-order, so logging the 'this' pointer
for the connection allows parsing code to disambiguate
---
cppcache/src/TcrConnection.cpp | 18 +++---
tools/gnmsg/chunked_message_decoder.py | 81 +++++++++++++++++++++++++
tools/gnmsg/server_message_decoder.py | 104 ++++++++++++++++++++++++++++++++-
3 files changed, 193 insertions(+), 10 deletions(-)
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 602c084..40d1238 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -678,7 +678,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
}
LOGDEBUG(
- "TcrConnection::readMessage: [%p] received header from endpoint %s; "
+ "TcrConnection::readMessage(%p): received header from endpoint %s; "
"bytes: %s",
this, m_endpointObj->name().c_str(),
Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str());
@@ -815,9 +815,9 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
}
LOGDEBUG(
- "TcrConnection::readResponseHeader: received header from "
+ "TcrConnection::readResponseHeader(%p): received header from "
"endpoint %s; bytes: %s",
- m_endpointObj->name().c_str(),
+ this, m_endpointObj->name().c_str(),
Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
auto input = m_connectionManager.getCacheImpl()->createDataInput(
@@ -828,11 +828,11 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
header.header.chunkLength = input.readInt32();
header.header.flags = input.read();
LOGDEBUG(
- "TcrConnection::readResponseHeader: "
+ "TcrConnection::readResponseHeader(%p): "
"messageType=%" PRId32 ", numberOfParts=%" PRId32
", transactionId=%" PRId32 ", chunkLength=%" PRId32
", lastChunkAndSecurityFlags=0x%" PRIx8,
- header.messageType, header.numberOfParts, header.transactionId,
+ this, header.messageType, header.numberOfParts, header.transactionId,
header.header.chunkLength, header.header.flags);
return header;
@@ -867,9 +867,9 @@ chunkHeader TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
header.chunkLength = input.readInt32();
header.flags = input.read();
LOGDEBUG(
- "TcrConnection::readChunkHeader: "
+ "TcrConnection::readChunkHeader(%p): "
", chunkLen=%" PRId32 ", lastChunkAndSecurityFlags=0x%" PRIx8,
- header.chunkLength, header.flags);
+ this, header.chunkLength, header.flags);
return header;
}
@@ -892,9 +892,9 @@ std::vector<uint8_t> TcrConnection::readChunkBody(
}
LOGDEBUG(
- "TcrConnection::readChunkBody: received chunk body from endpoint "
+ "TcrConnection::readChunkBody(%p): received chunk body from endpoint "
"%s; bytes: %s",
- m_endpointObj->name().c_str(),
+ this, m_endpointObj->name().c_str(),
Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
return chunkBody;
}
diff --git a/tools/gnmsg/chunked_message_decoder.py b/tools/gnmsg/chunked_message_decoder.py
new file mode 100644
index 0000000..e2be4b3
--- /dev/null
+++ b/tools/gnmsg/chunked_message_decoder.py
@@ -0,0 +1,81 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+import struct
+
+from message_types import message_types
+from read_values import read_int_value, read_byte_value, call_reader_function
+
+CHUNKED_MESSAGE_HEADER_LENGTH = 17
+
+class ChunkedResponseDecoder:
+ def __init__(self):
+ self.reset()
+
+ def add_header(self, connection, header):
+ if len(self.chunked_message) > 0:
+ raise Exception("Previous chunked message is not completed, can't process another header")
+
+ if len(header) == 2 * CHUNKED_MESSAGE_HEADER_LENGTH:
+ offset = 0
+ message_type = ""
+ (message_type, offset) = call_reader_function(header, offset, read_int_value)
+ self.chunked_message["Type"] = message_types[message_type]
+ # TODO: pass connection value in as a parameter
+ self.chunked_message["Connection"] = connection
+ self.chunked_message["Direction"] = "<---"
+ (self.chunked_message["Parts"], offset) = call_reader_function(header, offset, read_int_value)
+ (self.chunked_message["TransactionId"], offset) = call_reader_function(header, offset, read_int_value)
+ chunk_size = 0
+ flags = 0
+ (chunk_size, offset) = call_reader_function(header, offset, read_int_value)
+ (flags, offset) = call_reader_function(header, offset, read_byte_value)
+ self.chunked_message["ChunkInfo"] = []
+ self.add_chunk_header(chunk_size, flags)
+ else:
+ raise IndexError("Chunked message header should be " + str(CHUNKED_MESSAGE_HEADER_LENGTH) + " bytes")
+
+ def add_chunk_header(self, chunk_size, flags):
+ self.chunk_index += 1
+ if len(self.chunked_message) == 0:
+ raise Exception("Can't add chunk header before message header")
+
+ key = "Chunk" + str(self.chunk_index)
+ inner_item = dict(ChunkLength=int(chunk_size), Flags=flags)
+ outer_item = {}
+ outer_item[key] = inner_item
+ self.chunked_message["ChunkInfo"].append(outer_item)
+ self.chunk_flags = flags
+
+ def add_chunk(self, chunk):
+ if len(self.chunked_message) == 0:
+ raise Exception("Can't add chunks before message header")
+
+ self.message_body += chunk
+
+ def is_complete_message(self):
+ return self.chunk_flags & 0x1
+
+ def get_decoded_message(self):
+ return self.chunked_message
+
+ def reset(self):
+ self.header = ""
+ self.message_body = ""
+ self.chunked_message = {}
+ self.complete = False
+ self.chunk_flags = 0xff
+ self.chunk_index = -1
\ No newline at end of file
diff --git a/tools/gnmsg/server_message_decoder.py b/tools/gnmsg/server_message_decoder.py
index 5c3eab5..b6fcc96 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -22,7 +22,8 @@ from server_messages import parse_server_message
from decoder_base import DecoderBase
from message_types import message_types
from numeric_conversion import to_hex_digit
-
+from chunked_message_decoder import ChunkedResponseDecoder
+from read_values import read_number_from_hex_string
class ServerMessageDecoder(DecoderBase):
def __init__(self, output_queue):
@@ -50,6 +51,7 @@ class ServerMessageDecoder(DecoderBase):
"10.1.3": self.parse_response_fields_base,
"9.1.1": self.parse_response_fields_v911,
}
+ self.chunk_decoder = ChunkedResponseDecoder()
def search_for_version(self, line):
if self.nc_version_ == None:
@@ -148,6 +150,93 @@ class ServerMessageDecoder(DecoderBase):
return result
+
+ def get_response_header(self, line, parts):
+ # Check if this is a header for a chunked message
+ result = False
+
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader\(([0-9|a-f|A-F|x]+)\):\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ parts.append(match.group(4))
+ result = True
+
+ if not result:
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readResponseHeader:\s*received header from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append("")
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ result = True
+
+ return result
+
+ def get_chunk_header(self, line, parts):
+ # Check if this is a header for a chunked message
+ result = False
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader\(([0-9|a-f|A-F|x]+)\):\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ parts.append(match.group(4))
+ result = True
+
+ if not result:
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkHeader:\s*.*, chunkLen=(\d+), lastChunkAndSecurityFlags=([0-9|a-f|A-F|x]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append("")
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ result = True
+
+ return result
+
+ def get_chunk_bytes(self, line, parts):
+ # Check if this is a message chunk.
+ # If it is, add it to the chunked decoder
+ result = False
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody\(([0-9|a-f|A-F|x]+)\): \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ parts.append(match.group(4))
+ result = True
+
+ if not result:
+ expression = re.compile(
+ r"(\d\d:\d\d:\d\d\.\d+).*TcrConnection::readChunkBody: \s*received chunk body from endpoint\s*([\w|:|\d|\.|-]+);\s*bytes:\s*([\d|a-f|A-F]+)"
+ )
+ match = expression.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append("")
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ result = True
+
+ return result
+
def decimal_string_to_hex_string(self, byte):
high_nibble = int(int(byte) / 16)
low_nibble = int(byte) % 16
@@ -202,6 +291,7 @@ class ServerMessageDecoder(DecoderBase):
connection = None
message_bytes = None
message_body = None
+ chunk_bytes = None
self.search_for_version(line)
@@ -216,6 +306,18 @@ class ServerMessageDecoder(DecoderBase):
message_body = parts[0]
elif self.get_add_security_trace_parts(line, parts):
connection = parts[1]
+ elif self.get_response_header(line, parts):
+ self.chunk_decoder.add_header(parts[1], parts[3])
+ elif self.get_chunk_header(line, parts):
+ flags = 0xff
+ size = 0
+ (flags, size) = read_number_from_hex_string(parts[3], 2, len(parts[3]) - 2)
+ self.chunk_decoder.add_chunk_header(parts[2], flags)
+ elif self.get_chunk_bytes(line, parts):
+ self.chunk_decoder.add_chunk(parts[3])
+ if self.chunk_decoder.is_complete_message():
+ self.output_queue_.put({"message": self.chunk_decoder.get_decoded_message()})
+ self.chunk_decoder.reset()
else:
return