You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by GitBox <gi...@apache.org> on 2021/05/24 23:40:01 UTC

[GitHub] [tvm] areusch commented on a change in pull request #7876: [iOS] Add tracker support into ios-rpc application

areusch commented on a change in pull request #7876:
URL: https://github.com/apache/tvm/pull/7876#discussion_r638354133



##########
File path: apps/ios_rpc/tvmrpc/rpc_server.h
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_server.h
+ * \brief RPC Server implementation.
+ */
+#ifndef TVM_APPS_IOS_RPC_SERVER_H_
+#define TVM_APPS_IOS_RPC_SERVER_H_
+
+#include <string>
+#include <future>
+#include <chrono>
+#include <dirent.h>
+
+#include "tvm/runtime/c_runtime_api.h"
+#include "runtime/rpc/rpc_endpoint.h"
+#include "runtime/rpc/rpc_socket_impl.h"
+#include "support/socket.h"
+#include "rpc_tracker_client.h"
+
+namespace tvm {
+namespace runtime {
+
+std::vector<std::string> ListDir(const std::string& dirname) {

Review comment:
       could you comment this function to explain what is returned and what is not, plus any post-processing done?

##########
File path: apps/ios_rpc/tvmrpc/TVMRuntime.mm
##########
@@ -71,88 +73,19 @@ void LogMessageImpl(const std::string& file, int lineno, const std::string& mess
 namespace tvm {
 namespace runtime {
 
-class NSStreamChannel final : public RPCChannel {
- public:
-  explicit NSStreamChannel(NSOutputStream* stream) : stream_(stream) {}
-
-  size_t Send(const void* data, size_t size) final {
-    ssize_t nbytes = [stream_ write:reinterpret_cast<const uint8_t*>(data) maxLength:size];
-    if (nbytes < 0) {
-      NSLog(@"%@", [stream_ streamError].localizedDescription);
-      throw tvm::Error("Stream error");
-    }
-    return nbytes;
-  }
-
-  size_t Recv(void* data, size_t size) final {
-    LOG(FATAL) << "Do not allow explicit receive for";
-    return 0;
-  }
-
- private:
-  NSOutputStream* stream_;
-};
-
-FEventHandler CreateServerEventHandler(NSOutputStream* outputStream, std::string name,
-                                       std::string remote_key) {
-  std::unique_ptr<NSStreamChannel> ch(new NSStreamChannel(outputStream));
-  std::shared_ptr<RPCEndpoint> sess = RPCEndpoint::Create(std::move(ch), name, remote_key);
-  return [sess](const std::string& in_bytes, int flag) {
-    return sess->ServerAsyncIOEventHandler(in_bytes, flag);
-  };
-}
-
-// Runtime environment
-struct RPCEnv {
- public:
-  RPCEnv() {
-    NSString* path = NSTemporaryDirectory();
-    base_ = [path UTF8String];
-    if (base_[base_.length() - 1] != '/') {
-      base_ = base_ + '/';
-    }
-  }
-  // Get Path.
-  std::string GetPath(const std::string& file_name) { return base_ + file_name; }
-
- private:
-  std::string base_;
-};
-
-void LaunchSyncServer() {
-  // only load dylib from frameworks.
-  NSBundle* bundle = [NSBundle mainBundle];
-  NSString* base = [bundle privateFrameworksPath];
-  NSString* path = [base stringByAppendingPathComponent:@"tvm/rpc_config.txt"];
-  std::string name = [path UTF8String];
-  std::ifstream fs(name, std::ios::in);
-  std::string url, key;
-  int port;
-  ICHECK(fs >> url >> port >> key) << "Invalid RPC config file " << name;
-  RPCConnect(url, port, "server:" + key, TVMArgs(nullptr, nullptr, 0))->ServerLoop();
-}
-
 TVM_REGISTER_GLOBAL("tvm.rpc.server.workpath").set_body([](TVMArgs args, TVMRetValue* rv) {
-  static RPCEnv env;
-  *rv = env.GetPath(args[0]);
+  std::string name = args[0];
+  std::string base = [NSTemporaryDirectory() UTF8String];
+  *rv = base + "/" + name;
 });
 
 TVM_REGISTER_GLOBAL("tvm.rpc.server.load_module").set_body([](TVMArgs args, TVMRetValue* rv) {
   std::string name = args[0];
-  std::string fmt = GetFileFormat(name, "");

Review comment:
       could you comment why this is going away?

##########
File path: apps/ios_rpc/tvmrpc/rpc_server.h
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_server.h
+ * \brief RPC Server implementation.
+ */
+#ifndef TVM_APPS_IOS_RPC_SERVER_H_
+#define TVM_APPS_IOS_RPC_SERVER_H_
+
+#include <string>
+#include <future>
+#include <chrono>
+#include <dirent.h>
+
+#include "tvm/runtime/c_runtime_api.h"
+#include "runtime/rpc/rpc_endpoint.h"
+#include "runtime/rpc/rpc_socket_impl.h"
+#include "support/socket.h"
+#include "rpc_tracker_client.h"
+
+namespace tvm {
+namespace runtime {
+
+std::vector<std::string> ListDir(const std::string& dirname) {
+  std::vector<std::string> vec;
+  DIR* dp = opendir(dirname.c_str());
+  if (dp == nullptr) {
+    int errsv = errno;
+    LOG(FATAL) << "ListDir " << dirname << " error: " << strerror(errsv);
+  }
+  dirent* d;
+  while ((d = readdir(dp)) != nullptr) {
+    std::string filename = d->d_name;
+    if (filename != "." && filename != "..") {
+      std::string f = dirname;
+      if (f[f.length() - 1] != '/') {
+        f += '/';
+      }
+      f += d->d_name;
+      vec.push_back(f);
+    }
+  }
+  closedir(dp);
+  return vec;
+}
+
+/*!
+ * \brief CleanDir Removes the files from the directory
+ * \param dirname The name of the directory
+ */
+void CleanDir(const std::string& dirname) {
+  auto files = ListDir(dirname);
+  for (const auto& filename : files) {
+    std::string file_path = dirname + "/";
+    file_path += filename;
+    const int ret = std::remove(filename.c_str());
+    if (ret != 0) {
+      LOG(WARNING) << "Remove file " << filename << " failed";
+    }
+  }
+}
+
+// Runtime environment
+struct RPCEnv {
+ public:
+  RPCEnv(const std::string &base):base_(base) {}
+  // Get Path.
+  std::string GetPath(const std::string& file_name) { return base_ + file_name; }
+
+  void CleanUp() const {
+    CleanDir(base_);
+  }
+ private:
+  std::string base_;
+};
+
+
+/*!
+ * \brief RPCServer RPC Server class.
+ * \param host The hostname of the server, Default=0.0.0.0
+ * \param port The port of the RPC, Default=9090
+ * \param port_end The end search port of the RPC, Default=9099
+ * \param tracker The address of RPC tracker in host:port format e.g. 10.77.1.234:9190 Default=""
+ * \param key The key used to identify the device type in tracker. Default=""
+ * \param custom_addr Custom IP Address to Report to RPC Tracker. Default=""
+ */
+class RPCServer {
+ public:
+  /*!
+   * \brief Constructor.
+   */
+  RPCServer(std::string host, int port, int port_end, std::string tracker_addr, std::string key,
+            std::string custom_addr, std::string work_dir)
+      : host_(std::move(host)),
+        port_(port),
+        my_port_(0),
+        port_end_(port_end),
+        tracker_addr_(std::move(tracker_addr)),
+        key_(std::move(key)),
+        custom_addr_(std::move(custom_addr)),
+        work_dir_(std::move(work_dir)),
+        tracker_(tracker_addr_, key_, custom_addr_) {}
+
+  /*!
+   * \brief Destructor.
+   */
+  ~RPCServer() {
+    try {
+      // Free the resources
+      listen_sock_.Close();
+      tracker_.Close();
+    } catch (...) {
+    }
+  }
+
+  /*!
+   * \brief Start Creates the RPC listen process and execution.
+   */
+  void Start() {
+    listen_sock_.Create();
+    my_port_ = listen_sock_.TryBindHost(host_, port_, port_end_);
+    LOG(INFO) << "bind to " << host_ << ":" << my_port_;
+    listen_sock_.Listen(1);
+    continue_processing = true;
+    proc_ = std::future<void>(std::async(std::launch::async, &RPCServer::ListenLoopProc, this));
+  }
+  
+  void Stop() {
+    continue_processing = false;
+    tracker_.Close();
+  }
+    
+  void setCompletionCallbacks(std::function<void()> callback_start, std::function<void()> callback_stop) {
+    completion_callback_start_ = callback_start;
+    completion_callback_stop_ = callback_stop;
+  }
+
+ private:
+  /*!
+   * \brief ListenLoopProc The listen process.
+   */
+  void ListenLoopProc() {
+    
+    while (continue_processing) {
+      support::TCPSocket conn;
+      support::SockAddr addr("0.0.0.0", 0);
+      std::string opts;
+      try {
+        // step 1: setup tracker and report to tracker
+        tracker_.TryConnect();
+        if (completion_callback_start_)
+          completion_callback_start_();
+        // step 2: wait for in-coming connections
+        AcceptConnection(&tracker_, &conn, &addr, &opts);
+      } catch (const char* msg) {
+        LOG(WARNING) << "Socket exception: " << msg;
+        // close tracker resource
+        tracker_.Close();
+        continue;
+      } catch (const std::exception& e) {
+        // close tracker resource
+        tracker_.Close();
+        LOG(WARNING) << "Exception standard: " << e.what();

Review comment:
       what's "standard" mean? should this come before `tracker_.Close()` in case that throws an error?

##########
File path: apps/ios_rpc/tvmrpc/TVMRuntime.mm
##########
@@ -71,88 +73,19 @@ void LogMessageImpl(const std::string& file, int lineno, const std::string& mess
 namespace tvm {
 namespace runtime {
 
-class NSStreamChannel final : public RPCChannel {

Review comment:
       just curious why this is going away? not needed if we use standard sockets?

##########
File path: apps/ios_rpc/tvmrpc/rpc_server.h
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_server.h
+ * \brief RPC Server implementation.
+ */
+#ifndef TVM_APPS_IOS_RPC_SERVER_H_
+#define TVM_APPS_IOS_RPC_SERVER_H_
+
+#include <string>
+#include <future>
+#include <chrono>
+#include <dirent.h>
+
+#include "tvm/runtime/c_runtime_api.h"
+#include "runtime/rpc/rpc_endpoint.h"
+#include "runtime/rpc/rpc_socket_impl.h"
+#include "support/socket.h"
+#include "rpc_tracker_client.h"
+
+namespace tvm {
+namespace runtime {
+
+std::vector<std::string> ListDir(const std::string& dirname) {
+  std::vector<std::string> vec;
+  DIR* dp = opendir(dirname.c_str());
+  if (dp == nullptr) {
+    int errsv = errno;
+    LOG(FATAL) << "ListDir " << dirname << " error: " << strerror(errsv);
+  }
+  dirent* d;
+  while ((d = readdir(dp)) != nullptr) {
+    std::string filename = d->d_name;
+    if (filename != "." && filename != "..") {
+      std::string f = dirname;
+      if (f[f.length() - 1] != '/') {
+        f += '/';
+      }
+      f += d->d_name;
+      vec.push_back(f);
+    }
+  }
+  closedir(dp);
+  return vec;
+}
+
+/*!
+ * \brief CleanDir Removes the files from the directory
+ * \param dirname The name of the directory
+ */
+void CleanDir(const std::string& dirname) {
+  auto files = ListDir(dirname);
+  for (const auto& filename : files) {
+    std::string file_path = dirname + "/";
+    file_path += filename;
+    const int ret = std::remove(filename.c_str());
+    if (ret != 0) {
+      LOG(WARNING) << "Remove file " << filename << " failed";
+    }
+  }
+}
+
+// Runtime environment
+struct RPCEnv {
+ public:
+  RPCEnv(const std::string &base):base_(base) {}
+  // Get Path.
+  std::string GetPath(const std::string& file_name) { return base_ + file_name; }
+
+  void CleanUp() const {
+    CleanDir(base_);
+  }
+ private:
+  std::string base_;
+};
+
+
+/*!
+ * \brief RPCServer RPC Server class.
+ * \param host The hostname of the server, Default=0.0.0.0
+ * \param port The port of the RPC, Default=9090
+ * \param port_end The end search port of the RPC, Default=9099
+ * \param tracker The address of RPC tracker in host:port format e.g. 10.77.1.234:9190 Default=""
+ * \param key The key used to identify the device type in tracker. Default=""
+ * \param custom_addr Custom IP Address to Report to RPC Tracker. Default=""
+ */
+class RPCServer {
+ public:
+  /*!
+   * \brief Constructor.
+   */
+  RPCServer(std::string host, int port, int port_end, std::string tracker_addr, std::string key,
+            std::string custom_addr, std::string work_dir)
+      : host_(std::move(host)),
+        port_(port),
+        my_port_(0),
+        port_end_(port_end),
+        tracker_addr_(std::move(tracker_addr)),
+        key_(std::move(key)),
+        custom_addr_(std::move(custom_addr)),
+        work_dir_(std::move(work_dir)),
+        tracker_(tracker_addr_, key_, custom_addr_) {}
+
+  /*!
+   * \brief Destructor.
+   */
+  ~RPCServer() {
+    try {
+      // Free the resources
+      listen_sock_.Close();
+      tracker_.Close();
+    } catch (...) {
+    }
+  }
+
+  /*!
+   * \brief Start Creates the RPC listen process and execution.
+   */
+  void Start() {
+    listen_sock_.Create();
+    my_port_ = listen_sock_.TryBindHost(host_, port_, port_end_);
+    LOG(INFO) << "bind to " << host_ << ":" << my_port_;
+    listen_sock_.Listen(1);
+    continue_processing = true;
+    proc_ = std::future<void>(std::async(std::launch::async, &RPCServer::ListenLoopProc, this));
+  }
+  
+  void Stop() {
+    continue_processing = false;
+    tracker_.Close();
+  }
+    
+  void setCompletionCallbacks(std::function<void()> callback_start, std::function<void()> callback_stop) {
+    completion_callback_start_ = callback_start;
+    completion_callback_stop_ = callback_stop;
+  }
+
+ private:
+  /*!
+   * \brief ListenLoopProc The listen process.
+   */
+  void ListenLoopProc() {
+    
+    while (continue_processing) {
+      support::TCPSocket conn;
+      support::SockAddr addr("0.0.0.0", 0);
+      std::string opts;
+      try {
+        // step 1: setup tracker and report to tracker
+        tracker_.TryConnect();
+        if (completion_callback_start_)
+          completion_callback_start_();
+        // step 2: wait for in-coming connections
+        AcceptConnection(&tracker_, &conn, &addr, &opts);
+      } catch (const char* msg) {
+        LOG(WARNING) << "Socket exception: " << msg;
+        // close tracker resource
+        tracker_.Close();
+        continue;
+      } catch (const std::exception& e) {
+        // close tracker resource
+        tracker_.Close();
+        LOG(WARNING) << "Exception standard: " << e.what();
+        continue;
+      }
+
+      auto start_time = std::chrono::high_resolution_clock::now();
+      ServerLoopProc(conn, addr, work_dir_);
+      auto dur = std::chrono::high_resolution_clock::now() - start_time;
+
+      LOG(INFO) << "Serve Time " << std::chrono::duration_cast<std::chrono::milliseconds>(dur).count() << "ms";
+
+      // close from our side.
+      LOG(INFO) << "Socket Connection Closed";
+      conn.Close();
+    }
+    if (completion_callback_stop_)
+      completion_callback_stop_();
+
+  }
+
+  /*!
+   * \brief AcceptConnection Accepts the RPC Server connection.
+   * \param tracker Tracker details.
+   * \param conn_sock New connection information.
+   * \param addr New connection address information.
+   * \param opts Parsed options for socket
+   * \param ping_period Timeout for select call waiting
+   */
+  void AcceptConnection(TrackerClient* tracker, support::TCPSocket* conn_sock,
+                        support::SockAddr* addr, std::string* opts, int ping_period = 2) {
+    std::set<std::string> old_keyset;
+    std::string matchkey;
+
+    // Report resource to tracker and get key
+    tracker->ReportResourceAndGetKey(my_port_, &matchkey);
+
+    while (continue_processing) {
+      tracker->WaitConnectionAndUpdateKey(listen_sock_, my_port_, ping_period, &matchkey);
+      support::TCPSocket conn = listen_sock_.Accept(addr);
+
+      int code = kRPCMagic;
+      ICHECK_EQ(conn.RecvAll(&code, sizeof(code)), sizeof(code));
+      if (code != kRPCMagic) {
+        conn.Close();
+        LOG(FATAL) << "Client connected is not TVM RPC server";
+        continue;
+      }
+
+      int keylen = 0;
+      ICHECK_EQ(conn.RecvAll(&keylen, sizeof(keylen)), sizeof(keylen));
+
+      const char* CLIENT_HEADER = "client:";
+      const char* SERVER_HEADER = "server:";
+      std::string expect_header = CLIENT_HEADER + matchkey;
+      std::string server_key = SERVER_HEADER + key_;
+      if (size_t(keylen) < expect_header.length()) {
+        conn.Close();
+        LOG(INFO) << "Wrong client header length";
+        continue;
+      }
+
+      ICHECK_NE(keylen, 0);
+      std::string remote_key;
+      remote_key.resize(keylen);
+      ICHECK_EQ(conn.RecvAll(&remote_key[0], keylen), keylen);
+
+      std::stringstream ssin(remote_key);
+      std::string arg0;
+      ssin >> arg0;
+
+      if (arg0 != expect_header) {
+        code = kRPCMismatch;
+        ICHECK_EQ(conn.SendAll(&code, sizeof(code)), sizeof(code));
+        conn.Close();
+        LOG(WARNING) << "Mismatch key from" << addr->AsString();
+        continue;
+      } else {
+        code = kRPCSuccess;
+        ICHECK_EQ(conn.SendAll(&code, sizeof(code)), sizeof(code));
+        keylen = int(server_key.length());
+        ICHECK_EQ(conn.SendAll(&keylen, sizeof(keylen)), sizeof(keylen));
+        ICHECK_EQ(conn.SendAll(server_key.c_str(), keylen), keylen);
+        LOG(INFO) << "Connection success " << addr->AsString();
+        ssin >> *opts;
+        *conn_sock = conn;
+        return;
+      }
+    }
+  }
+
+  /*!
+   * \brief ServerLoopProc The Server loop process.
+   * \param sock The socket information
+   * \param addr The socket address information
+   */
+  static void ServerLoopProc(support::TCPSocket sock, support::SockAddr addr,
+                             std::string work_dir) {
+    // Server loop
+    const auto env = RPCEnv(work_dir);
+    RPCServerLoop(int(sock.sockfd));
+    LOG(INFO) << "Finish serving " << addr.AsString();
+    env.CleanUp();
+  }
+
+  /*!
+   * \brief GetTimeOutFromOpts Parse and get the timeout option.
+   * \param opts The option string
+   */
+  int GetTimeOutFromOpts(const std::string& opts) const {
+    const std::string option = "-timeout=";

Review comment:
       not sure if this should also have units? `-timeout-sec`

##########
File path: apps/ios_rpc/tvmrpc/TVMRuntime.mm
##########
@@ -22,36 +22,38 @@
  */
 #include "TVMRuntime.h"
 // Runtime API
-#include "../../../src/runtime/c_runtime_api.cc"
-#include "../../../src/runtime/cpu_device_api.cc"
-#include "../../../src/runtime/dso_library.cc"
-#include "../../../src/runtime/file_utils.cc"
-#include "../../../src/runtime/library_module.cc"
-#include "../../../src/runtime/metadata_module.cc"
-#include "../../../src/runtime/module.cc"
-#include "../../../src/runtime/ndarray.cc"
-#include "../../../src/runtime/object.cc"
-#include "../../../src/runtime/registry.cc"
-#include "../../../src/runtime/system_library.cc"
-#include "../../../src/runtime/thread_pool.cc"
-#include "../../../src/runtime/threading_backend.cc"
-#include "../../../src/runtime/workspace_pool.cc"
-
-// RPC server
-#include "../../../src/runtime/rpc/rpc_channel.cc"
-#include "../../../src/runtime/rpc/rpc_endpoint.cc"
-#include "../../../src/runtime/rpc/rpc_local_session.cc"
-#include "../../../src/runtime/rpc/rpc_module.cc"
-#include "../../../src/runtime/rpc/rpc_server_env.cc"
-#include "../../../src/runtime/rpc/rpc_session.cc"
-#include "../../../src/runtime/rpc/rpc_socket_impl.cc"
-// Graph executor
-#include "../../../src/runtime/graph_executor/graph_executor.cc"
-// Metal
-#include "../../../src/runtime/metal/metal_device_api.mm"
-#include "../../../src/runtime/metal/metal_module.mm"
-// CoreML
-#include "../../../src/runtime/contrib/coreml/coreml_runtime.mm"
+//#include "../../../src/runtime/c_runtime_api.cc"

Review comment:
       since i can't read XCode project files--is this change adjusting the XCode project to depend on libtvmruntime.so somehow?

##########
File path: apps/ios_rpc/tvmrpc/rpc_server.h
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*!
+ * \file rpc_server.h
+ * \brief RPC Server implementation.
+ */
+#ifndef TVM_APPS_IOS_RPC_SERVER_H_
+#define TVM_APPS_IOS_RPC_SERVER_H_
+
+#include <string>
+#include <future>
+#include <chrono>
+#include <dirent.h>
+
+#include "tvm/runtime/c_runtime_api.h"
+#include "runtime/rpc/rpc_endpoint.h"
+#include "runtime/rpc/rpc_socket_impl.h"
+#include "support/socket.h"
+#include "rpc_tracker_client.h"
+
+namespace tvm {
+namespace runtime {
+
+std::vector<std::string> ListDir(const std::string& dirname) {
+  std::vector<std::string> vec;
+  DIR* dp = opendir(dirname.c_str());
+  if (dp == nullptr) {
+    int errsv = errno;
+    LOG(FATAL) << "ListDir " << dirname << " error: " << strerror(errsv);
+  }
+  dirent* d;
+  while ((d = readdir(dp)) != nullptr) {
+    std::string filename = d->d_name;
+    if (filename != "." && filename != "..") {
+      std::string f = dirname;
+      if (f[f.length() - 1] != '/') {
+        f += '/';
+      }
+      f += d->d_name;
+      vec.push_back(f);
+    }
+  }
+  closedir(dp);
+  return vec;
+}
+
+/*!
+ * \brief CleanDir Removes the files from the directory
+ * \param dirname The name of the directory
+ */
+void CleanDir(const std::string& dirname) {
+  auto files = ListDir(dirname);
+  for (const auto& filename : files) {
+    std::string file_path = dirname + "/";
+    file_path += filename;
+    const int ret = std::remove(filename.c_str());
+    if (ret != 0) {
+      LOG(WARNING) << "Remove file " << filename << " failed";
+    }
+  }
+}
+
+// Runtime environment
+struct RPCEnv {
+ public:
+  RPCEnv(const std::string &base):base_(base) {}
+  // Get Path.
+  std::string GetPath(const std::string& file_name) { return base_ + file_name; }
+
+  void CleanUp() const {
+    CleanDir(base_);
+  }
+ private:
+  std::string base_;
+};
+
+
+/*!
+ * \brief RPCServer RPC Server class.
+ * \param host The hostname of the server, Default=0.0.0.0
+ * \param port The port of the RPC, Default=9090
+ * \param port_end The end search port of the RPC, Default=9099
+ * \param tracker The address of RPC tracker in host:port format e.g. 10.77.1.234:9190 Default=""
+ * \param key The key used to identify the device type in tracker. Default=""
+ * \param custom_addr Custom IP Address to Report to RPC Tracker. Default=""
+ */
+class RPCServer {
+ public:
+  /*!
+   * \brief Constructor.
+   */
+  RPCServer(std::string host, int port, int port_end, std::string tracker_addr, std::string key,
+            std::string custom_addr, std::string work_dir)
+      : host_(std::move(host)),
+        port_(port),
+        my_port_(0),
+        port_end_(port_end),
+        tracker_addr_(std::move(tracker_addr)),
+        key_(std::move(key)),
+        custom_addr_(std::move(custom_addr)),
+        work_dir_(std::move(work_dir)),
+        tracker_(tracker_addr_, key_, custom_addr_) {}
+
+  /*!
+   * \brief Destructor.
+   */
+  ~RPCServer() {
+    try {
+      // Free the resources
+      listen_sock_.Close();
+      tracker_.Close();
+    } catch (...) {
+    }
+  }
+
+  /*!
+   * \brief Start Creates the RPC listen process and execution.
+   */
+  void Start() {
+    listen_sock_.Create();
+    my_port_ = listen_sock_.TryBindHost(host_, port_, port_end_);
+    LOG(INFO) << "bind to " << host_ << ":" << my_port_;
+    listen_sock_.Listen(1);
+    continue_processing = true;
+    proc_ = std::future<void>(std::async(std::launch::async, &RPCServer::ListenLoopProc, this));
+  }
+  
+  void Stop() {
+    continue_processing = false;
+    tracker_.Close();
+  }
+    
+  void setCompletionCallbacks(std::function<void()> callback_start, std::function<void()> callback_stop) {
+    completion_callback_start_ = callback_start;
+    completion_callback_stop_ = callback_stop;
+  }
+
+ private:
+  /*!
+   * \brief ListenLoopProc The listen process.
+   */
+  void ListenLoopProc() {
+    
+    while (continue_processing) {
+      support::TCPSocket conn;
+      support::SockAddr addr("0.0.0.0", 0);
+      std::string opts;
+      try {
+        // step 1: setup tracker and report to tracker
+        tracker_.TryConnect();
+        if (completion_callback_start_)
+          completion_callback_start_();
+        // step 2: wait for in-coming connections
+        AcceptConnection(&tracker_, &conn, &addr, &opts);
+      } catch (const char* msg) {
+        LOG(WARNING) << "Socket exception: " << msg;
+        // close tracker resource
+        tracker_.Close();
+        continue;
+      } catch (const std::exception& e) {
+        // close tracker resource
+        tracker_.Close();
+        LOG(WARNING) << "Exception standard: " << e.what();
+        continue;
+      }
+
+      auto start_time = std::chrono::high_resolution_clock::now();
+      ServerLoopProc(conn, addr, work_dir_);
+      auto dur = std::chrono::high_resolution_clock::now() - start_time;
+
+      LOG(INFO) << "Serve Time " << std::chrono::duration_cast<std::chrono::milliseconds>(dur).count() << "ms";
+
+      // close from our side.
+      LOG(INFO) << "Socket Connection Closed";
+      conn.Close();
+    }
+    if (completion_callback_stop_)
+      completion_callback_stop_();
+
+  }
+
+  /*!
+   * \brief AcceptConnection Accepts the RPC Server connection.
+   * \param tracker Tracker details.
+   * \param conn_sock New connection information.
+   * \param addr New connection address information.
+   * \param opts Parsed options for socket
+   * \param ping_period Timeout for select call waiting
+   */
+  void AcceptConnection(TrackerClient* tracker, support::TCPSocket* conn_sock,
+                        support::SockAddr* addr, std::string* opts, int ping_period = 2) {

Review comment:
       specify units: `ping_period_sec`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org