You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/04/04 11:09:46 UTC

[3/4] incubator-singa git commit: SINGA-156 Remove the dependency on ZMQ for single process training

SINGA-156 Remove the dependency on ZMQ for single process training

Move msg queue init into dealer and router.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/42f5253e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/42f5253e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/42f5253e

Branch: refs/heads/master
Commit: 42f5253eacde9a0ab87d3b4ed2382a137d9652d6
Parents: 65b8c8d
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Mon Apr 4 16:52:51 2016 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Mon Apr 4 16:52:51 2016 +0800

----------------------------------------------------------------------
 include/singa/comm/socket.h |  3 ++-
 include/singa/server.h      |  2 ++
 src/comm/socket.cc          |  8 ++++++++
 src/driver.cc               | 12 ------------
 src/server.cc               | 12 ++++++------
 src/worker.cc               |  2 +-
 6 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/include/singa/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h
index de8cbde..3194d8c 100644
--- a/include/singa/comm/socket.h
+++ b/include/singa/comm/socket.h
@@ -43,7 +43,7 @@ class Dealer {
    /**
     * @param id used for identifying the msg queue of this dealer.
     */
-   Dealer(int id) : id_(id) {}
+   Dealer(int id);
   ~Dealer();
   /**
     * Setup the connection with the remote router.
@@ -83,6 +83,7 @@ class Dealer {
 class Router {
  public:
   ~Router();
+  Router();
   /**
    * Bind the router to an endpoint for recv msg from remote dealer.
    * If the router is used for intra-communication only, then no need to call

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/include/singa/server.h
----------------------------------------------------------------------
diff --git a/include/singa/server.h b/include/singa/server.h
index 4bffeae..d95862d 100644
--- a/include/singa/server.h
+++ b/include/singa/server.h
@@ -126,6 +126,8 @@ class Server {
   std::vector<int> n_pending_sync_;
   std::vector<Blob<float>> last_sync_;
   std::unordered_map<int, std::vector<Msg*>> buffer_requests_;
+
+  Dealer* dealer_;
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/comm/socket.cc
----------------------------------------------------------------------
diff --git a/src/comm/socket.cc b/src/comm/socket.cc
index 8245398..aa1ee85 100644
--- a/src/comm/socket.cc
+++ b/src/comm/socket.cc
@@ -31,6 +31,10 @@ Dealer::~Dealer() {
 #endif
 }
 
+Dealer::Dealer(int id) : id_ (id) {
+  msgQueues[id];
+}
+
 int Dealer::Connect(const std::string& endpoint) {
   if (endpoint.length() > 0) {
 #ifdef USE_ZMQ
@@ -79,6 +83,10 @@ Router::~Router() {
 #endif
 }
 
+Router::Router() {
+  msgQueues[-1];
+}
+
 int Router::Bind(const std::string& endpoint) {
   int port = -1;
   if (endpoint.length() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 2952c62..b8f6735 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -232,18 +232,6 @@ void Driver::Train(const JobProto& job_conf) {
       net->ToGraph(true).ToJson());
   const vector<Worker*> workers = CreateWorkers(job_conf, net);
   const vector<Server*> servers = CreateServers(job_conf, net);
-  // Add msg queues for each socket
-  for (auto worker : workers) {
-    msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerParam)];
-    msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerLayer)];
-//    LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerParam);
-//    LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerLayer);
-  }
-  for (auto server : servers) {
-    msgQueues[Addr(server->grp_id(), server->id(), kServer)];
-//    LOG(ERROR) << "server addr " << Addr(server->grp_id(), server->id(), kServer);
-  }
-  msgQueues[-1];
 
   vector<std::thread> threads;
   for (auto server : servers)

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/server.cc
----------------------------------------------------------------------
diff --git a/src/server.cc b/src/server.cc
index d5ef028..3b72243 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -44,6 +44,7 @@ Server::Server(int group_id, int server_id,
   updater_ = Updater::Create(job_conf.updater());
   slice2group_ = slice2group;
   slice2server_ = slice2server;
+  dealer_ = new Dealer(Addr(grp_id_, id_, kServer));
 }
 
 Server::~Server() {
@@ -52,6 +53,7 @@ Server::~Server() {
   for (auto entry : shard_)
     for (auto param : entry.second->shares)
       delete param;
+  delete dealer_;
 }
 
 void Stop(void* running) {
@@ -73,11 +75,10 @@ void Server::Run() {
 
   bool running = true;
   CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
-  auto dealer = new Dealer(Addr(grp_id_, id_, kServer));
   // start recv loop and process requests
   while (running) {
     // cannot use blocking Receive() here, it will get stuck after workers stop.
-    Msg* msg = dealer->Receive(cluster->poll_time());
+    Msg* msg = dealer_->Receive(cluster->poll_time());
     if (msg == nullptr)
       continue;
     Msg* response = nullptr;
@@ -97,7 +98,7 @@ void Server::Run() {
           break;
         case kUpdate:
           for (auto reply : HandleUpdate(&msg))
-            dealer->Send(&reply);
+            dealer_->Send(&reply);
           break;
         case kSyncRequest:
           response = HandleSyncRequest(&msg);
@@ -111,16 +112,15 @@ void Server::Run() {
       }
     }
     if (response != nullptr)
-      dealer->Send(&response);
+      dealer_->Send(&response);
   }
 
   // send stop msg to stub
   Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
   msg->set_type(kStop);
-  dealer->Send(&msg);
+  dealer_->Send(&msg);
   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
   LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
-  delete dealer;
 }
 
 Msg* Server::HandlePut(Msg **msg) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
index 6c461ce..5206513 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -53,7 +53,7 @@ void Worker::Setup(int grp_id, int id, const JobProto& conf,
   train_net_ = train_net;
   val_net_ = val_net;
   test_net_ = test_net;
-  bridge_dealer_ = dealer_ = nullptr;
+  InitSockets(train_net);
 }
 
 Worker::~Worker() {