You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by GitBox <gi...@apache.org> on 2022/04/14 19:18:36 UTC

[GitHub] [tvm] mkatanbaf commented on a diff in pull request #10967: [rpc] Implemented rpc logging

mkatanbaf commented on code in PR #10967:
URL: https://github.com/apache/tvm/pull/10967#discussion_r850736863


##########
src/runtime/minrpc/minrpc_server_logging.h:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TVM_RUNTIME_MINRPC_MINRPC_SERVER_LOGGING_H_
+#define TVM_RUNTIME_MINRPC_MINRPC_SERVER_LOGGING_H_
+
+#include "minrpc_logger.h"
+#include "minrpc_server.h"
+
+namespace tvm {
+namespace runtime {
+
+/*!
+ * \brief A minimum RPC server that logs the received commands.
+ *
+ * \tparam TIOHandler IO provider to provide io handling.
+ */
+template <typename TIOHandler>
+class MinRPCServerWithLog {
+ public:
+  explicit MinRPCServerWithLog(TIOHandler* io)
+      : ret(io), retWLog(&ret), exec(io, &retWLog), execWLog(&exec), next_(io, &execWLog) {}
+
+  bool ProcessOnePacket() { return next_.ProcessOnePacket(); }
+
+ private:
+  MinRPCReturns<TIOHandler> ret;
+  MinRPCExecute<TIOHandler> exec;
+  MinRPCReturnsWithLog retWLog;
+  MinRPCExecuteWithLog execWLog;
+  MinRPCServer<TIOHandler> next_;
+};
+
+/*!
+ * \brief A minimum RPC server that only logs the outgoing commands and received responses.
+ * (Does not process the packets or respond to them.)
+ *
+ * \tparam TIOHandler IO provider to provide io handling.
+ */
+template <typename TIOHandler>
+class MinRPCSniffer {
+ public:
+  explicit MinRPCSniffer(TIOHandler* io)
+      : io_(io), ret(io), retWLog(&ret), exec(&retWLog), execWLog(&exec), next_(io, &execWLog) {}
+
+  bool ProcessOnePacket() { return next_.ProcessOnePacket(); }
+
+  void ProcessOneResponse() {
+    RPCCode code;
+    uint64_t packet_len = 0;
+
+    this->Read(&packet_len);
+    if (packet_len == 0) OutputLog();
+    this->Read(&code);
+    switch (code) {
+      case RPCCode::kReturn: {
+        HandleReturn();
+        break;
+      }
+      case RPCCode::kException: {
+        retWLog.ReturnException("");
+        break;
+      }
+      default: {
+        OutputLog();
+        break;
+      }
+    }
+  }
+
+ private:
+  void HandleReturn() {
+    int32_t num_args;
+    int32_t tcode;
+
+    this->Read(&num_args);
+    if (num_args == 1) {
+      this->Read(&tcode);
+      if (tcode == kTVMNullptr) {
+        retWLog.ReturnVoid();
+        return;
+      }
+      if (tcode == kTVMOpaqueHandle) {
+        uint64_t handle;
+        this->Read(&handle);
+        retWLog.ReturnHandle(reinterpret_cast<void*>(handle));
+        return;
+      }
+    }
+    OutputLog();
+  }
+
+  void OutputLog() { retWLog.getLogger()->OutputLog(); }
+
+  template <typename T>
+  void Read(T* data) {
+    static_assert(std::is_pod<T>::value, "need to be trival");
+    ReadRawBytes(data, sizeof(T));
+  }
+
+  void ReadRawBytes(void* data, size_t size) {
+    uint8_t* buf = reinterpret_cast<uint8_t*>(data);
+    size_t ndone = 0;
+    while (ndone < size) {
+      ssize_t ret = io_->PosixRead(buf, size - ndone);
+      if (ret <= 0) {
+        retWLog.getLogger()->LogString("-> No Response Received.");
+        break;
+      }
+      ndone += ret;
+      buf += ret;
+    }
+  }
+
+  TIOHandler* io_;

Review Comment:
   thanks, fixed it!



##########
src/runtime/rpc/rpc_channel_logger.h:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_channel_logger.h
+ * \brief A wrapper for RPCChannel with a NanoRPCListener for logging the commands.
+ */
+#ifndef TVM_RUNTIME_RPC_RPC_CHANNEL_LOGGER_H_
+#define TVM_RUNTIME_RPC_RPC_CHANNEL_LOGGER_H_
+
+#include <memory>
+#include <utility>
+
+#include "../minrpc/minrpc_server_logging.h"
+#include "rpc_channel.h"
+
+#define RX_BUFFER_SIZE 65536
+
+namespace tvm {
+namespace runtime {
+
+class Buffer {
+ public:
+  Buffer(uint8_t* data, size_t data_size_bytes)
+      : data_{data}, capacity_{data_size_bytes}, num_valid_bytes_{0}, read_cursor_{0} {}
+
+  size_t Write(const uint8_t* data, size_t data_size_bytes) {
+    size_t num_bytes_available = capacity_ - num_valid_bytes_;
+    size_t num_bytes_to_copy = data_size_bytes;
+    if (num_bytes_available < num_bytes_to_copy) {
+      num_bytes_to_copy = num_bytes_available;
+    }
+
+    memcpy(&data_[num_valid_bytes_], data, num_bytes_to_copy);
+    num_valid_bytes_ += num_bytes_to_copy;
+    return num_bytes_to_copy;
+  }
+
+  size_t Read(uint8_t* data, size_t data_size_bytes) {
+    size_t num_bytes_to_copy = data_size_bytes;
+    size_t num_bytes_available = num_valid_bytes_ - read_cursor_;
+    if (num_bytes_available < num_bytes_to_copy) {
+      num_bytes_to_copy = num_bytes_available;
+    }
+
+    memcpy(data, &data_[read_cursor_], num_bytes_to_copy);
+    read_cursor_ += num_bytes_to_copy;
+    return num_bytes_to_copy;
+  }
+
+  void Clear() {
+    num_valid_bytes_ = 0;
+    read_cursor_ = 0;
+  }
+
+  size_t ReadAvailable() const { return num_valid_bytes_ - read_cursor_; }
+
+  size_t Size() const { return num_valid_bytes_; }
+
+ private:
+  /*! \brief pointer to data buffer. */
+  uint8_t* data_;
+
+  /*! \brief The total number of bytes available in data_. Always a power of 2. */
+  size_t capacity_;
+
+  /*! \brief index into data_ of the next potentially-available byte in the buffer.
+   * The byte is available when tail_ != data_ + capacity_.
+   */
+  size_t num_valid_bytes_;
+
+  /*! \brief Read cursor position. */
+  size_t read_cursor_;
+};
+
+/*!
+ * \brief A simple IO handler for MinRPCSniffer.
+ *
+ * \tparam micro_rpc::FrameBuffer* buffer to store received data.
+ */
+class SnifferIOHandler {
+ public:
+  explicit SnifferIOHandler(Buffer* receive_buffer) : receive_buffer_(receive_buffer) {}
+
+  void MessageStart(size_t message_size_bytes) {}
+
+  ssize_t PosixWrite(const uint8_t* buf, size_t buf_size_bytes) { return 0; }
+
+  void MessageDone() {}
+
+  ssize_t PosixRead(uint8_t* buf, size_t buf_size_bytes) {
+    return receive_buffer_->Read(buf, buf_size_bytes);
+  }
+
+  void Close() {}
+
+  void Exit(int code) {}
+
+ private:
+  Buffer* receive_buffer_;
+};
+
+/*!
+ * \brief A simple rpc session that logs the received commands.
+ */
+class NanoRPCListener {
+ public:
+  NanoRPCListener()
+      : receive_buffer_(receive_storage, receive_storage_size_bytes),
+        io_(&receive_buffer_),
+        rpc_server_(&io_) {}
+
+  void Listen(const uint8_t* data, size_t size) { receive_buffer_.Write(data, size); }
+
+  void ProcessTxPacket() {
+    rpc_server_.ProcessOnePacket();
+    ClearBuffer();
+  }
+
+  void ProcessRxPacket() {
+    rpc_server_.ProcessOneResponse();
+    ClearBuffer();
+  }
+
+ private:
+  void ClearBuffer() { receive_buffer_.Clear(); }
+
+ private:
+  size_t receive_storage_size_bytes = RX_BUFFER_SIZE;

Review Comment:
   thanks, fixed it!



##########
src/runtime/rpc/rpc_channel_logger.h:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_channel_logger.h
+ * \brief A wrapper for RPCChannel with a NanoRPCListener for logging the commands.
+ */
+#ifndef TVM_RUNTIME_RPC_RPC_CHANNEL_LOGGER_H_
+#define TVM_RUNTIME_RPC_RPC_CHANNEL_LOGGER_H_
+
+#include <memory>
+#include <utility>
+
+#include "../minrpc/minrpc_server_logging.h"
+#include "rpc_channel.h"
+
+#define RX_BUFFER_SIZE 65536
+
+namespace tvm {
+namespace runtime {
+
+class Buffer {
+ public:
+  Buffer(uint8_t* data, size_t data_size_bytes)
+      : data_{data}, capacity_{data_size_bytes}, num_valid_bytes_{0}, read_cursor_{0} {}
+
+  size_t Write(const uint8_t* data, size_t data_size_bytes) {
+    size_t num_bytes_available = capacity_ - num_valid_bytes_;
+    size_t num_bytes_to_copy = data_size_bytes;
+    if (num_bytes_available < num_bytes_to_copy) {
+      num_bytes_to_copy = num_bytes_available;
+    }
+
+    memcpy(&data_[num_valid_bytes_], data, num_bytes_to_copy);
+    num_valid_bytes_ += num_bytes_to_copy;
+    return num_bytes_to_copy;
+  }
+
+  size_t Read(uint8_t* data, size_t data_size_bytes) {
+    size_t num_bytes_to_copy = data_size_bytes;
+    size_t num_bytes_available = num_valid_bytes_ - read_cursor_;
+    if (num_bytes_available < num_bytes_to_copy) {
+      num_bytes_to_copy = num_bytes_available;
+    }
+
+    memcpy(data, &data_[read_cursor_], num_bytes_to_copy);
+    read_cursor_ += num_bytes_to_copy;
+    return num_bytes_to_copy;
+  }
+
+  void Clear() {
+    num_valid_bytes_ = 0;
+    read_cursor_ = 0;
+  }
+
+  size_t ReadAvailable() const { return num_valid_bytes_ - read_cursor_; }
+
+  size_t Size() const { return num_valid_bytes_; }
+
+ private:
+  /*! \brief pointer to data buffer. */
+  uint8_t* data_;
+
+  /*! \brief The total number of bytes available in data_. Always a power of 2. */
+  size_t capacity_;
+
+  /*! \brief index into data_ of the next potentially-available byte in the buffer.
+   * The byte is available when tail_ != data_ + capacity_.

Review Comment:
   thanks, fixed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@tvm.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org