You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2015/06/25 15:44:59 UTC

[1/6] incubator-singa git commit: fixbugs; todo wait master to merge feature-sliceparam

Repository: incubator-singa
Updated Branches:
  refs/heads/master a0199588c -> c51f9264b


fixbugs; todo wait master to merge feature-sliceparam


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/ad13d038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/ad13d038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/ad13d038

Branch: refs/heads/master
Commit: ad13d038562966e503d8f865a078fa6d95758b96
Parents: 51d4c2a
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Tue Jun 23 15:54:47 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:49:32 2015 +0800

----------------------------------------------------------------------
 include/communication/msg.h |  3 ++-
 include/utils/cluster.h     |  1 +
 src/communication/msg.cc    | 13 +++++++++++++
 src/communication/socket.cc |  3 +--
 src/trainer/server.cc       |  2 +-
 src/utils/cluster.cc        |  1 +
 6 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index 60a359a..11b6012 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -16,14 +16,15 @@ namespace singa {
 class Msg {
  public:
   Msg();
+  Msg(const Msg& msg);
   ~Msg();
+  int size() const;
 
   /**
     * @param first worker/server group id
     * @param second worker/server id within the group
     * @param flag 0 for server, 1 for worker, 2 for stub
     */
-<<<<<<< HEAD
   inline void set_src(int first, int second, int flag) {
     src_ = (first << kOff1) | (second << kOff2) | flag;
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index c11874e..3830383 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -58,6 +58,7 @@ class Cluster {
    * @return global procs id, which starts from 0.
    */
   int procs_id()const {return procs_id_;}
+  void set_procs_id(int procs_id) {procs_id_ = procs_id;}
   bool server_worker_separate() const {
     return cluster_.server_worker_separate();
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
index 4f41077..7ee8cad 100644
--- a/src/communication/msg.cc
+++ b/src/communication/msg.cc
@@ -7,11 +7,24 @@ Msg::Msg() {
   msg_ = zmsg_new();
 }
 
+Msg::Msg(const Msg& msg){
+  src_=msg.src_;
+  dst_=msg.dst_;
+  type_=msg.type_;
+  target_first_=msg.target_first_;
+  target_second_=msg.target_second_;
+  msg_=zmsg_dup(msg.msg_);
+}
+
 Msg::~Msg() {
   if (msg_ != nullptr)
     zmsg_destroy(&msg_);
 }
 
+int Msg::size() const{
+  return zmsg_content_size(msg_);
+}
+
 void Msg::add_frame(const void* addr, int nBytes) {
   zmsg_addmem(msg_, addr, nBytes);
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 2039162..c6925d8 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -90,8 +90,7 @@ Router::~Router() {
       zmsg_destroy(&msg);
   }
 }
-
-int Router::Bind(string endpoint){
+int Router::Bind(std::string endpoint){
   int port=-1;
   if(endpoint.length()){
     port=zsock_bind(router_, "%s", endpoint.c_str());

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 5185c51..9e0dee3 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -76,7 +76,7 @@ void Server::Run(){
           if(i!=group_id_) {
             Msg* tmp=sync;
             if(i<cluster->nserver_groups()-1)
-              tmp= new Msg(sync);
+              tmp= new Msg(*sync);
             tmp->set_dst(i, server_locator_->at(param), kServer);
             tmp->set_src(group_id_, server_id_, kServer);
             dealer_->Send(&tmp);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/ad13d038/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index d022a64..023abbe 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -63,6 +63,7 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
 
 void Cluster::Register(const string& endpoint){
   procs_id_=cluster_rt_->RegistProc(endpoint);
+  LOG(ERROR)<<endpoint;
   CHECK_GE(procs_id_,0);
   CHECK_LT(procs_id_,nprocs());
 }


[6/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild

Posted by wa...@apache.org.
SINGA-8 Implement distributed Hogwild

handle zookeeper disconnection.
change zookeeper log level to ERROR.

close #14


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/c51f9264
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/c51f9264
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/c51f9264

Branch: refs/heads/master
Commit: c51f9264bb26f22cc7e49ee85ea2fc30c322cc9f
Parents: 4956d6a
Author: wang sheng <wa...@gmail.com>
Authored: Thu Jun 25 21:40:38 2015 +0800
Committer: wang sheng <wa...@gmail.com>
Committed: Thu Jun 25 21:40:38 2015 +0800

----------------------------------------------------------------------
 src/utils/cluster_rt.cc | 13 +++++++++----
 src/utils/param.cc      |  9 ++++++---
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c51f9264/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 61911fd..6143567 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -28,7 +28,7 @@ ZKClusterRT::~ZKClusterRT() {
 char zk_cxt[] = "ZKClusterRT";
 
 bool ZKClusterRT::Init() {
-  zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
   zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0,
                              static_cast<void *>(zk_cxt), 0);
   if (zkhandle_ == NULL) {
@@ -176,9 +176,14 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
   for (int i = 0; i < kNumRetry; ++i) {
     ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
                      &ZOO_OPEN_ACL_UNSAFE, flag, buf, kMaxBufLen);
-    if (ret != ZNONODE) break;
-    LOG(WARNING) << "zookeeper parent node of " << path
-                 << " not exist, retry later";
+    if (ret == ZNONODE) {
+      LOG(WARNING) << "zookeeper parent node of " << path
+                  << " not exist, retry later";
+    } else if (ret == ZCONNECTIONLOSS) {
+      LOG(WARNING) << "zookeeper disconnected, retry later";
+    } else {
+      break;
+    }
     sleep(kSleepSec);
   }
   // copy the node name ot output

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c51f9264/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index 1e05ab9..07ad8ce 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -168,7 +168,8 @@ Msg* Param::HandlePutMsg(Msg** msg){
 }
 
 Msg* Param::HandleGetMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
+  int copy;
+  sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy)
     (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
@@ -179,7 +180,8 @@ Msg* Param::HandleGetMsg(Msg** msg){
 }
 
 int Param::ParseUpdateMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
+  int copy;
+  sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
     LOG(ERROR)<<"Copy in parse update";
@@ -231,7 +233,8 @@ int Param::ParseUpdateResponseMsg(Msg **msg, int slice_idx){
 }
 
 void Param::ParseResponseMsg(Msg** msg, int slice_idx){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
+  int copy;
+  sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
     CHECK((*msg)->frame_size());


[5/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild

Posted by wa...@apache.org.
SINGA-8 Implement distributed Hogwild

Fixbug from parameter synchronization among server groups.
Interprocs dealer cannot send messages to other process if the endpoint is hostname, e.g., "blob-pc".
Replaced hostname to host IP in binding/connecting endpoint. But the GetHostIP method is specific to linux OS.
Another issue is the synchronization frequency. Currently, the stub will trigger one sync reminder every time
its poller expires. If the expire time is large, then the reminder would seldomly be triggered. If it is small,
many reminder messages will be trigger. TODO tune the trigger.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4956d6a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4956d6a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4956d6a0

Branch: refs/heads/master
Commit: 4956d6a031de16811e4585b9c28b9ab29c33ab76
Parents: 884b9d7
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Thu Jun 25 20:53:51 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 20:53:51 2015 +0800

----------------------------------------------------------------------
 include/utils/cluster.h |  7 ++++---
 include/utils/common.h  |  1 +
 src/trainer/server.cc   | 10 ++++++++--
 src/trainer/trainer.cc  | 12 ++++++++++--
 src/utils/cluster.cc    |  4 +---
 src/utils/common.cc     | 32 ++++++++++++++++++++++++++++++++
 src/utils/param.cc      | 22 +++++++++++++---------
 7 files changed, 69 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 55b10a9..e5980ca 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -6,6 +6,7 @@
 #include <memory>
 #include <vector>
 #include <unordered_map>
+#include "utils/common.h"
 #include "proto/cluster.pb.h"
 #include "utils/cluster_rt.h"
 
@@ -123,8 +124,8 @@ class Cluster {
   }
 
   int ProcsIDOf(int group_id, int id, int flag);
-  const string hostname() const {
-    return hostname_;
+  const string hostip() const {
+    return hostip_;
   }
   void Register(const string& endpoint);
 
@@ -136,7 +137,7 @@ class Cluster {
  private:
   int procs_id_;
   int nprocs_;
-  string hostname_;
+  string hostip_;
   std::vector<std::string> endpoints_;
   // cluster config proto
   ClusterProto cluster_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
index aca35ec..d3c23a8 100644
--- a/include/utils/common.h
+++ b/include/utils/common.h
@@ -32,6 +32,7 @@ int LeastCommonMultiple(int a, int b);
 inline float rand_real(){
   return  static_cast<float>(rand())/(RAND_MAX+1.0f);
 }
+const std::string GetHostIP();
 
 class Metric {
  public:

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 9ea4509..14b25da 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -70,7 +70,12 @@ void Server::Run(){
         if(syncEntry>=master_params.size())
           continue;
         auto param=master_params.at(syncEntry);
-        if(param->local_version()!=param->version()){
+        // control the frequency of synchronization
+        // currently sync is triggerred only when the slice is updated
+        // by local worker or other workers for at least nserver_groups times.
+        // TODO may optimize the trigger condition.
+        if(abs(param->local_version()-param->version())>=cluster->nserver_groups()){
+          // TODO replace the argument (0,0) to sync a chunk instead of a slice
           sync=param->GenSyncMsg(0,0);
           for(int i=0;i<cluster->nserver_groups();i++){
             if(i!=group_id_) {
@@ -82,7 +87,7 @@ void Server::Run(){
               tmp->set_src(group_id_, server_id_, kServer);
               dealer_->Send(&tmp);
               param->set_version(param->local_version());
-              //DLOG(ERROR)<<"sync";
+              //LOG(ERROR)<<"sync slice="<<param->id()<<" to procs "<<i;
             }
           }
           syncEntry=(syncEntry+1)%master_params.size();
@@ -172,6 +177,7 @@ Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
   CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float));
   Tensor<cpu, 1> tmp(static_cast<float*>((*msg)->frame_data()), shape);
   Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+  //LOG(ERROR)<<"Recv sync for "<<param->id();
   if(slice2group_[param->id()]==group_id_){
     cur+=tmp;
     param->set_local_version(param->local_version()+1);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 4e7f932..58e568e 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -257,8 +257,9 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
   router_=make_shared<Router>();
   router_->Bind(kInprocRouterEndpoint);
   if(cluster->nprocs()>1){
-    int port=router_->Bind("tcp://127.0.0.1:*");
-    cluster->Register(cluster->hostname()+":"+std::to_string(port));
+    const string hostip=cluster->hostip();
+    int port=router_->Bind("tcp://"+hostip+":*");
+    cluster->Register(hostip+":"+std::to_string(port));
   }else
     cluster->set_procs_id(0);
 
@@ -312,6 +313,9 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
   poll.Add(router_.get());
   int sync_server=0, nworkers=workers.size(), nservers=servers.size();
   while(!stop){
+    // if the poll time is large, then the poller may not expire
+    // if it is small, then many reminder messages will be sent which may
+    // slow done the process of other request. TODO tune it.
     auto *sock=poll.Wait(cluster->poll_time());
     if(poll.Terminated()){
       LOG(ERROR)<<"Connection broken!";
@@ -411,6 +415,8 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
         }else{
           dst_procs_id=cluster->ProcsIDOf(msg->dst_first(),
               msg->dst_second(), msg->dst_flag());
+          if(type==kSync)
+            LOG(ERROR)<<msg->dst_first()<<","<<msg->dst_second()<<","<<dst_procs_id;
         }
         if(dst_procs_id!=procs_id_){
           // forward to other procs
@@ -474,6 +480,8 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
     for(int idx=0, id=param->slice_start();idx<param->num_slices();idx++){
       int server=slice2server_[id+idx];
       int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
+      if(procs!=procs_id_)
+        LOG(ERROR)<<"Copy for update";
       auto x=param->GenGetMsg(procs!=procs_id_, idx);
       x->set_trgt(param->owner(), id+idx, param->local_version()+1);
       x->set_src(procs_id_, gid, kStub);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index bf423b4..706d2ef 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -51,9 +51,7 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   rt->Init();
   cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
 
-  char buf[128];
-  gethostname(buf, 128);
-  hostname_=string(buf);
+  hostip_=GetHostIP();
 }
 
 void Cluster::Register(const string& endpoint){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/common.cc
----------------------------------------------------------------------
diff --git a/src/utils/common.cc b/src/utils/common.cc
index ed94856..a3242aa 100644
--- a/src/utils/common.cc
+++ b/src/utils/common.cc
@@ -6,6 +6,16 @@
 #include <google/protobuf/text_format.h>
 #include <google/protobuf/io/zero_copy_stream_impl.h>
 #include <stdarg.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+
+
 
 namespace singa {
 
@@ -106,4 +116,26 @@ int LeastCommonMultiple(int a, int b)
 
   return temp ? (a / temp * b) : 0;
 }
+
+const std::string GetHostIP(){
+  int fd;
+  struct ifreq ifr;
+
+  fd = socket(AF_INET, SOCK_DGRAM, 0);
+
+  /* I want to get an IPv4 IP address */
+  ifr.ifr_addr.sa_family = AF_INET;
+
+  /* I want IP address attached to "eth0" */
+  strncpy(ifr.ifr_name, "eth0", IFNAMSIZ-1);
+
+  ioctl(fd, SIOCGIFADDR, &ifr);
+
+  close(fd);
+
+  string ip(inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr));
+  /* display result */
+  LOG(INFO)<<"Host IP=("<<ip;
+  return ip;
+}
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index ac3a6bb..1e05ab9 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -108,7 +108,7 @@ Msg* Param::GenGetMsg(bool copy, int idx){
   CHECK_LT(idx, num_slices_);
   Msg* msg=new Msg();
   msg->set_type(kGet);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
   pending_get_[idx]=true;
   num_pending_requests_++;
@@ -119,11 +119,13 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){
   CHECK_LT(idx, num_slices_);
   Msg* msg=new Msg();
   msg->set_type(kUpdate);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
   void* ptr=grad_.mutable_cpu_data()+slice_offset_[idx];
-  if(copy)
+  if(copy){
+    LOG(ERROR)<<"Copy in gen update";
     msg->add_frame(ptr, slice_size_[idx]*sizeof(float));
+  }
   else{ // to share values of grad blob
     char buf[32]; sprintf(buf, " %p ", ptr);
     msg->add_frame(buf, strlen(buf));
@@ -166,7 +168,7 @@ Msg* Param::HandlePutMsg(Msg** msg){
 }
 
 Msg* Param::HandleGetMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy)
     (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
@@ -177,9 +179,10 @@ Msg* Param::HandleGetMsg(Msg** msg){
 }
 
 int Param::ParseUpdateMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
+    LOG(ERROR)<<"Copy in parse update";
     CHECK((*msg)->frame_size());
     memcpy(mutable_cpu_grad(), (*msg)->frame_data(),(*msg)->frame_size());
   }else {// use the same data field of the grad blob
@@ -194,10 +197,12 @@ int Param::ParseUpdateMsg(Msg** msg){
 Msg* Param::GenUpdateResponseMsg(bool copy){
   Msg* msg=new Msg();
   msg->set_type(kRUpdate);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
-  if(copy)
+  if(copy){
+    LOG(ERROR)<<"Copy in gen";
     msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  }
   return msg;
 }
 
@@ -226,10 +231,9 @@ int Param::ParseUpdateResponseMsg(Msg **msg, int slice_idx){
 }
 
 void Param::ParseResponseMsg(Msg** msg, int slice_idx){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
-    LOG(ERROR)<<"copy";
     CHECK((*msg)->frame_size());
     memcpy(mutable_cpu_data()+slice_offset_[slice_idx],
         (*msg)->frame_data(), (*msg)->frame_size());


[3/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper. TODO slice large Param objects in a separate branch.

Posted by wa...@apache.org.
SINGA-8 Implement distributed Hogwild
Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper.
TODO slice large Param objects in a separate branch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/51d4c2ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/51d4c2ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/51d4c2ae

Branch: refs/heads/master
Commit: 51d4c2aec4f9ddedeff588a916b25a0041dd8a88
Parents: f437011
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Fri Jun 19 14:35:50 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:49:32 2015 +0800

----------------------------------------------------------------------
 include/trainer/trainer.h   |  2 +-
 include/utils/cluster.h     | 19 ++++++++++---------
 include/utils/cluster_rt.h  |  6 +++---
 src/communication/socket.cc | 14 +++++++-------
 src/main.cc                 |  2 +-
 src/trainer/trainer.cc      |  8 +++++---
 src/utils/cluster.cc        | 29 ++++++++++++++++++++++++++---
 7 files changed, 53 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index fbbfd0b..fb716bc 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -89,7 +89,7 @@ class Trainer{
    * @param clusterproto
    */
   void Start(const ModelProto& modelproto, const ClusterProto& clusterproto,
-    int procs_id);
+    const int procs_id);
 
   // TODO add Resume() function to continue training from a previously stopped
   // point.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 0eeb808..c11874e 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -23,7 +23,7 @@ namespace singa {
 class Cluster {
  public:
   static shared_ptr<Cluster> Get();
-  static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id);
+  static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id=0);
 
   const int nserver_groups()const{ return cluster_.nserver_groups(); }
   const int nworker_groups()const { return cluster_.nworker_groups(); }
@@ -71,17 +71,12 @@ class Cluster {
     return nprocs_;
   }
 
-  const string endpoint() const {
-    return endpoint(procs_id());
-  }
+
   /**
    * @return endpoint of the router of a procs with the specified id
    */
-  const string endpoint(int procs_id) const {
-    CHECK_LT(procs_id, nprocs_);
-    CHECK_GE(procs_id, 0);
-    return endpoints_.at(procs_id);
-  }
+  const string endpoint(const int procs_id) const;
+
   const string workspace() {return cluster_.workspace();}
   const string vis_folder(){
     return cluster_.workspace()+"/visualization";
@@ -127,6 +122,11 @@ class Cluster {
   }
 
   int ProcsIDOf(int group_id, int id, int flag);
+  const string hostname() const {
+    return hostname_;
+  }
+  void Register(const string& endpoint);
+
  private:
   Cluster(const ClusterProto &cluster, int procs_id) ;
   void SetupFolders(const ClusterProto &cluster);
@@ -135,6 +135,7 @@ class Cluster {
  private:
   int procs_id_;
   int nprocs_;
+  string hostname_;
   std::vector<std::string> endpoints_;
   // cluster config proto
   ClusterProto cluster_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 08bea90..1b877ec 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -10,7 +10,7 @@ namespace singa {
 typedef void (*rt_callback)(void *contest);
 
 /**
- * ClusterRuntime is a runtime service that manages dynamic configuration 
+ * ClusterRuntime is a runtime service that manages dynamic configuration
  * and status of the whole cluster. It mainly provides following services:
  *    1)  Provide running status of each server/worker
  *    2)  Translate process id to (hostname:port)
@@ -35,8 +35,8 @@ class ClusterRuntime {
    */
   virtual std::string GetProcHost(int proc_id) = 0;
   /**
-   * Server: watch all workers in a server group, 
-   * will be notified when all workers have left 
+   * Server: watch all workers in a server group,
+   * will be notified when all workers have left
    */
   virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
   /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 38c0d79..2039162 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -36,7 +36,6 @@ Dealer::Dealer(int id) : id_(id) {
   CHECK_NOTNULL(poller_);
 }
 
-<<<<<<< HEAD
 Dealer::~Dealer() {
   zsock_destroy(&dealer_);
 }
@@ -92,13 +91,14 @@ Router::~Router() {
   }
 }
 
-int Router::Bind(const std::string& endpoint) {
-  CHECK_GT(endpoint.length(), 0);
-  if (endpoint.length()) {
-    CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0);
-    return 1;
+int Router::Bind(string endpoint){
+  int port=-1;
+  if(endpoint.length()){
+    port=zsock_bind(router_, "%s", endpoint.c_str());
   }
-  return 0;
+  CHECK_NE(port,-1)<<endpoint;
+  LOG(INFO)<<"bind successfully to "<<endpoint+":"+std::to_string(port);
+  return port;
 }
 
 int Router::Send(Msg **msg) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index 77898be..851d528 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -19,7 +19,7 @@
  * easily, e.g., AddLayer(layer_type, source_layers, meta_data).
  */
 
-DEFINE_int32(procsID, 0, "Global process ID");
+DEFINE_int32(procsID, -1, "Global process ID");
 DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
 DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index bdc1416..cd80296 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -25,17 +25,14 @@ void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){
 }
 
 void HandleWorkerFinish(void * ctx){
-  /*
   HandleContext* hctx=static_cast<HandleContext*> (ctx);
   Msg* msg=new Msg();
   msg->set_src(-1,-1, kRuntime);
   msg->set_dst(hctx->group_id, hctx->id, kServer);
   msg->set_type(kStop);
   hctx->dealer->Send(&msg);
-  */
 }
 
-<<<<<<< HEAD
 const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
     const vector<shared_ptr<Param>>& params){
   std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
@@ -417,6 +414,11 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
           if (interprocs_dealers.find(dst_procs_id)==interprocs_dealers.end()){
             auto dealer=make_shared<Dealer>();
             interprocs_dealers[dst_procs_id]=dealer;
+            while(cluster->endpoint(dst_procs_id)==""){
+              std::this_thread::sleep_for(
+                  std::chrono::milliseconds(kCollectSleepTime));
+              LOG(ERROR)<<"waiting for procs "<< dst_procs_id<<" to register";
+            }
             dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id));
           }
           if(bandwidth(amount, start) <=cluster->bandwidth()){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 035e0c7..d022a64 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -1,5 +1,6 @@
 #include <glog/logging.h>
 #include <fcntl.h>
+#include <unistd.h>
 #include <fstream>
 #include "utils/cluster.h"
 #include "proto/cluster.pb.h"
@@ -13,12 +14,17 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   procs_id_=procs_id;
   cluster_ = cluster;
   SetupFolders(cluster);
+  int nprocs;
   if(server_worker_separate())
     nprocs_=nworker_procs()+nserver_procs();
   else
-    nprocs_=std::max(nworker_procs(), nserver_procs());
-  CHECK_LT(procs_id, nprocs_);
-  if(nprocs_>1){
+    nprocs=std::max(nworker_procs(), nserver_procs());
+  CHECK_LT(procs_id, nprocs);
+  if (cluster_.has_nprocs())
+    CHECK_EQ(cluster.nprocs(), nprocs);
+  else
+    cluster_.set_nprocs(nprocs);
+  if(nprocs>1&&procs_id>-1){
     std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
     std::string line;
     while(std::getline(ifs, line)
@@ -49,8 +55,25 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   auto rt=new ZKClusterRT(cluster_.zookeeper_host());
   rt->Init();
   cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
+
+  char buf[128];
+  gethostname(buf, 128);
+  hostname_=string(buf);
 }
 
+void Cluster::Register(const string& endpoint){
+  procs_id_=cluster_rt_->RegistProc(endpoint);
+  CHECK_GE(procs_id_,0);
+  CHECK_LT(procs_id_,nprocs());
+}
+const string Cluster::endpoint(int procsid) const{
+  CHECK_LT(procsid, nprocs());
+  CHECK_GE(procsid, 0);
+  if(endpoints_.size())
+    return endpoints_.at(procsid);
+  else
+    return cluster_rt_->GetProcHost(procsid);
+}
 void Cluster::SetupFolders(const ClusterProto &cluster){
   // create visulization folder
   mkdir(vis_folder().c_str(),  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);


[4/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild

Posted by wa...@apache.org.
SINGA-8 Implement distributed Hogwild

The original Param objects are sliced to make the size of parameters mastered by server groups (roughly) equal.
Following Caffe's implementation, we let each server group master a subset of param slices.
Each server group updates all model parameters for the corresponding worker groups and synchronize with other server groups on their mastered slices.
Tested on single node with multiple processes, each of which has one server group with one server and one worker group with one worker.
The training loss decreases not as fast as shared-memory hogwild. TODO optimize and test on multiple nodes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/884b9d70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/884b9d70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/884b9d70

Branch: refs/heads/master
Commit: 884b9d70a631bee4961fb3907e47a747c5dd2b89
Parents: ad13d03
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Thu Jun 25 11:39:45 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:50:28 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/model.conf    |  1 +
 include/communication/socket.h |  2 +-
 include/trainer/trainer.h      |  3 +-
 include/utils/cluster.h        | 10 ++---
 include/utils/param.h          |  5 +--
 src/communication/msg.cc       |  4 +-
 src/communication/socket.cc    |  2 +-
 src/proto/cluster.proto        |  1 -
 src/proto/common.proto         |  1 +
 src/trainer/server.cc          | 79 ++++++++++++++++++-------------------
 src/trainer/trainer.cc         | 37 +++++++++--------
 src/utils/cluster.cc           | 15 +++----
 src/utils/param.cc             | 14 +++----
 13 files changed, 83 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/examples/cifar10/model.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/model.conf b/examples/cifar10/model.conf
index 42be6dd..2bf76b0 100644
--- a/examples/cifar10/model.conf
+++ b/examples/cifar10/model.conf
@@ -25,6 +25,7 @@ layer{
   sharddata_conf {
     path: "examples/cifar10/cifar10_train_shard"
     batchsize: 16
+    random_skip: 5000
   }
   exclude: kTest
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/communication/socket.h
----------------------------------------------------------------------
diff --git a/include/communication/socket.h b/include/communication/socket.h
index b98656e..5a9598c 100644
--- a/include/communication/socket.h
+++ b/include/communication/socket.h
@@ -59,7 +59,7 @@ class Poller {
   /**
    * @return true if the poller is terminated due to process interupt
    */
-  virtual bool Terminated()=0;
+  virtual bool Terminated();
 
  protected:
 #ifdef USE_ZMQ

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index fb716bc..31d3704 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -101,8 +101,7 @@ class Trainer{
       const ModelProto& mproto, vector<int> *slice_size);
 
   void Run(const vector<shared_ptr<Worker>>& workers,
-      const vector<shared_ptr<Server>>& servers,
-      const std::map<int, shared_ptr<ParamShard>>& shards);
+      const vector<shared_ptr<Server>>& servers);
   /**
    * Register default implementations for all base classes used in the system,
    * e.g., the Updater, BaseMsg, etc.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 3830383..55b10a9 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -32,7 +32,10 @@ class Cluster {
   int nworkers_per_procs()const{return cluster_.nworkers_per_procs();}
   int nservers_per_procs()const{return cluster_.nservers_per_procs();}
   int nworker_groups_per_server_group() const {
-    return cluster_.nworker_groups()/cluster_.nserver_groups();
+    if(nserver_groups()==0||nservers_per_group()==0)
+      return 1;
+    else
+      return cluster_.nworker_groups()/cluster_.nserver_groups();
   }
 
   /**
@@ -49,10 +52,7 @@ class Cluster {
    * @return true if the calling procs has worker threads.
    */
   bool has_worker()const {
-    if(server_worker_separate()){
-      return procs_id_<nworker_procs();
-    }else
-      return procs_id_<nprocs_;
+    return procs_id_<nworker_procs();
   }
   /**
    * @return global procs id, which starts from 0.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
index d449fba..61e862b 100644
--- a/include/utils/param.h
+++ b/include/utils/param.h
@@ -39,7 +39,7 @@ class Param {
    * This function is called at server side where the Param is actually a slice
    * of an original Param object.
    * */
-  virtual Msg* GenSyncMsg();
+  virtual Msg* GenSyncMsg(int offset, int size);
   /**
    * Generate the message to response the update request.
    *
@@ -70,8 +70,6 @@ class Param {
    * \copydetails HandleGetMsg(Msg**)
    */
   virtual Msg* HandleSyncMsg(Msg** msg);
-
-<<<<<<< HEAD
   /**
    * Server parses update request message.
    *
@@ -106,7 +104,6 @@ class Param {
    * @param shape
    */
   virtual void Setup(const ParamProto& proto, const std::vector<int>& shape);
-  virtual void Setup(const vector<int>& shape);
   /*
    * Fill the values according to initmethod, e.g., gaussian distribution
    *

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
index 7ee8cad..38512d2 100644
--- a/src/communication/msg.cc
+++ b/src/communication/msg.cc
@@ -11,8 +11,8 @@ Msg::Msg(const Msg& msg){
   src_=msg.src_;
   dst_=msg.dst_;
   type_=msg.type_;
-  target_first_=msg.target_first_;
-  target_second_=msg.target_second_;
+  trgt_first_=msg.trgt_first_;
+  trgt_second_=msg.trgt_second_;
   msg_=zmsg_dup(msg.msg_);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index c6925d8..0cb0982 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -90,7 +90,7 @@ Router::~Router() {
       zmsg_destroy(&msg);
   }
 }
-int Router::Bind(std::string endpoint){
+int Router::Bind(const std::string& endpoint){
   int port=-1;
   if(endpoint.length()){
     port=zsock_bind(router_, "%s", endpoint.c_str());

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index 3317f2a..8fbdbbe 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -43,7 +43,6 @@ message ClusterProto {
   optional int32 bandwidth=50 [default=134217728];
   // poll time in milliseconds
   optional int32 poll_time=51 [default =100];
->>>>>>> SINGA-8 Implement distributed Hogwild
 }
 
 message ServerTopology {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/proto/common.proto
----------------------------------------------------------------------
diff --git a/src/proto/common.proto b/src/proto/common.proto
index 6bc0919..70b743c 100644
--- a/src/proto/common.proto
+++ b/src/proto/common.proto
@@ -13,6 +13,7 @@ enum MsgType {
   kRUpdate = 9;
   kConnect = 10;
   kMetric = 11;
+  kSyncReminder = 12;
 };
 
 enum EntityType {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 9e0dee3..9ea4509 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -7,6 +7,7 @@
 #include "utils/singleton.h"
 #include "utils/factory.h"
 #include "utils/cluster.h"
+#include "proto/common.pb.h"
 
 using namespace mshadow;
 namespace singa {
@@ -34,8 +35,9 @@ void Server::Run(){
   ping->add_frame("PING", 4);
   ping->set_type(kConnect);
   dealer_->Send(&ping);
-  int syncEntry=0;
-	//start recv loop and process requests
+  vector<shared_ptr<Param>> master_params;
+  size_t syncEntry=0;
+  //start recv loop and process requests
   while (true){
     Msg* msg=dealer_->Receive();
     if (msg==nullptr)
@@ -53,46 +55,39 @@ void Server::Run(){
       CHECK_STREQ("PONG", pong.c_str());
       DeleteMsg(&msg);
     }else if(type==kPut){
+      int pid = msg->trgt_second();
       response = HandlePut(&msg);
+      if(slice2group_[pid]==group_id_)
+        master_params.push_back(shard_->at(pid));
     }else{
       int pid=msg->trgt_second();
       if(shard_->find(pid)==shard_->end()){
         // delay the processing by re-queue the msg.
         response=msg;
         DLOG(ERROR)<<"Requeue msg";
-    }else if(type==kSyncReminder){
-      DeleteMsg(&msg);
-      unsigned nchecks=0, nparams=shard_->size();
-      while(nchecks<nparams
-          &&group_locator_->at(shard_->at(syncEntry))!=group_id_){
-        syncEntry=(syncEntry+1)%nparams;
-        nchecks++;
-      }
-      if(nchecks==nparams) continue;
-      auto param=shard_->at(syncEntry);
-      if(param->local_version()!=param->version()){
-        sync=param->GenSyncMsg(true);
-        for(int i=0;i<cluster->nserver_groups();i++){
-          if(i!=group_id_) {
-            Msg* tmp=sync;
-            if(i<cluster->nserver_groups()-1)
-              tmp= new Msg(*sync);
-            tmp->set_dst(i, server_locator_->at(param), kServer);
-            tmp->set_src(group_id_, server_id_, kServer);
-            dealer_->Send(&tmp);
-            param->set_version(param->local_version());
-            //DLOG(ERROR)<<"sync";
+      }else if(type == kSyncReminder){
+        DeleteMsg(&msg);
+        if(syncEntry>=master_params.size())
+          continue;
+        auto param=master_params.at(syncEntry);
+        if(param->local_version()!=param->version()){
+          sync=param->GenSyncMsg(0,0);
+          for(int i=0;i<cluster->nserver_groups();i++){
+            if(i!=group_id_) {
+              Msg* tmp=sync;
+              if(i<cluster->nserver_groups()-1)
+                tmp= new Msg(*sync);
+              // assume only one server per group, TODO generalize it
+              tmp->set_dst(i, 0, kServer);
+              tmp->set_src(group_id_, server_id_, kServer);
+              dealer_->Send(&tmp);
+              param->set_version(param->local_version());
+              //DLOG(ERROR)<<"sync";
+            }
           }
+          syncEntry=(syncEntry+1)%master_params.size();
         }
-      }
-    }else {
-      int pid=msg->target_first();
-      if(shard_->find(pid)==shard_->end()){
-        // delay the processing by re-queue the msg.
-        response=msg;
-        LOG(ERROR)<<"Requeue";
->>>>>>> SINGA-8 Implement distributed Hogwild
-      } else{
+      }else{
         auto param=shard_->at(pid);
         switch (type){
           case kGet:
@@ -118,7 +113,7 @@ void Server::Run(){
 
 Msg* Server::HandlePut(Msg **msg){
   int version=(*msg)->trgt_third();
-  int pid=(*msg)->target_first();
+  int pid=(*msg)->trgt_second();
   shared_ptr<Param> param=nullptr;
   if(shard_->find(pid)!=shard_->end()){
     LOG(ERROR)<<"Param ("<<pid<<") is put more than once";
@@ -126,19 +121,21 @@ Msg* Server::HandlePut(Msg **msg){
   }else{
     auto factory=Singleton<Factory<Param>>::Instance();
     param=shared_ptr<Param>(factory ->Create("Param"));
-    param->set_id(pid);
     (*shard_)[pid]=param;
   }
   auto response=param->HandlePutMsg(msg);
   // must set version after HandlePutMsg which allocates the memory
   param->set_version(version);
+  param->set_local_version(version);
+  param->set_id(pid);
   if(Cluster::Get()->nserver_groups()>1 &&
-      group_locator_->at(param)!=group_id_){
+      slice2group_[pid]!=group_id_){
     last_data_[pid]=std::make_shared<Blob<float>>();
     last_data_[pid]->ReshapeLike(param->data());
     last_data_[pid]->CopyFrom(param->data());
   }
-  LOG(INFO)<<"Server put param "<<pid<<" size="<<param->size()<<" Bytes";
+  LOG(INFO)<<"server ("<<group_id_<<", "<<server_id_
+    <<") put slice="<<pid<<" size="<<param->size();
   return response;
 }
 
@@ -161,9 +158,9 @@ Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
   int step=(*msg)->trgt_third();
   bool copy=param->ParseUpdateMsg(msg);
   updater_->Update(step, param);
-  param->set_version(param->version()+1);
+  param->set_local_version(param->local_version()+1);
   auto response=param->GenUpdateResponseMsg(copy);
-  response->set_trgt(paramid, sliceid, param->version());
+  response->set_trgt(paramid, sliceid, param->local_version());
   response->SetAddr(tmp);
   delete tmp;
   return response;
@@ -175,7 +172,7 @@ Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
   CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float));
   Tensor<cpu, 1> tmp(static_cast<float*>((*msg)->frame_data()), shape);
   Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
-  if(group_locator_->at(param)==group_id_){
+  if(slice2group_[param->id()]==group_id_){
     cur+=tmp;
     param->set_local_version(param->local_version()+1);
   }else{
@@ -188,7 +185,7 @@ Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
     if(bandwidth>0){
       response=new Msg();
       response->set_type(kSyncRequest);
-      response->set_target(param->id(), param->version());
+      response->set_trgt(-1, param->id(), param->version());
       response->add_frame(diff.dptr, param->size()*sizeof(float));
       (*msg)->SwapAddr();
       response->SetAddr(*msg);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index cd80296..4e7f932 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -179,12 +179,12 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
   auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
       cluster->nworkers_per_group());
   int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group());
-    auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
-    for(auto param: net->params()){
-      if(param->id()==param->owner())
-        for(auto entry: paramid2slices[param->id()])
-          slice_size->push_back(entry.second);
-    }
+  auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
+  for(auto param: net->params()){
+    if(param->id()==param->owner())
+      for(auto entry: paramid2slices[param->id()])
+        slice_size->push_back(entry.second);
+  }
 
   for(int gid=gstart;gid<gend;gid++){
     shared_ptr<NeuralNet> train_net, test_net, validation_net;
@@ -249,15 +249,20 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads,
 
 void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
     int procs_id){
-  procs_id_=procs_id;
+  // procs_id is only used for resume training
+  CHECK_EQ(procs_id, -1);
   RegisterDefaultClasses(mproto);
 
   auto cluster=Cluster::Get(cproto, procs_id);
   router_=make_shared<Router>();
   router_->Bind(kInprocRouterEndpoint);
-  if(cluster->nprocs()>1)
-    router_->Bind(cluster->endpoint());
+  if(cluster->nprocs()>1){
+    int port=router_->Bind("tcp://127.0.0.1:*");
+    cluster->Register(cluster->hostname()+":"+std::to_string(port));
+  }else
+    cluster->set_procs_id(0);
 
+  procs_id_ = cluster->procs_id();
   int nthreads=1;
   // create workers
   vector<int> slices;
@@ -280,7 +285,7 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
     threads.push_back(std::thread(&Server::Run,server.get()));
   for(auto worker: workers)
     threads.push_back(std::thread(&Worker::Run,worker.get()));
-  Run(workers, servers, shards);
+  Run(workers, servers);
   for(auto& thread: threads)
     thread.join();
   for(auto x: ctx)
@@ -292,9 +297,9 @@ inline int bandwidth(int bytes, system_clock::time_point start){
   auto duration=duration_cast<TimeT> (now - start);
   return static_cast<int>(bytes*1000.f/duration.count());
 }
+
 void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
-    const vector<shared_ptr<Server>>& servers,
-    const std::map<int, shared_ptr<Trainer::ParamShard>>& shards){
+    const vector<shared_ptr<Server>>& servers){
   auto cluster=Cluster::Get();
   procs_id_=cluster->procs_id();
   LOG(INFO)<<"Stub in process "<<procs_id_<<" starts";
@@ -307,7 +312,7 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
   poll.Add(router_.get());
   int sync_server=0, nworkers=workers.size(), nservers=servers.size();
   while(!stop){
-    Socket *sock=poll.Wait(cluster->poll_time());
+    auto *sock=poll.Wait(cluster->poll_time());
     if(poll.Terminated()){
       LOG(ERROR)<<"Connection broken!";
       exit(0);
@@ -321,7 +326,6 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
         msg->set_type(kSyncReminder);
         sync_server=(sync_server+1)%servers.size();
         router_->Send(&msg);
-        //LOG(ERROR)<<"Reminder";
       }
       continue;
     }
@@ -345,14 +349,13 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
             nservers--;
           else if (msg->src_flag()==kWorkerParam)
             nworkers--;
-          delete msg;
-          msg=nullptr;
+          DeleteMsg(&msg);
           if(nworkers==0&&nservers==0){
             stop=true;
             break;
           }
         }else if(type==kMetric){
-          if(msg->src_first()==0){
+          if(msg->src_first()>=0){
             int step=msg->trgt_first();
             string prefix((char*)msg->frame_data(), msg->frame_size());
             msg->next_frame();

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 023abbe..bf423b4 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -14,21 +14,16 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   procs_id_=procs_id;
   cluster_ = cluster;
   SetupFolders(cluster);
-  int nprocs;
   if(server_worker_separate())
     nprocs_=nworker_procs()+nserver_procs();
   else
-    nprocs=std::max(nworker_procs(), nserver_procs());
-  CHECK_LT(procs_id, nprocs);
-  if (cluster_.has_nprocs())
-    CHECK_EQ(cluster.nprocs(), nprocs);
-  else
-    cluster_.set_nprocs(nprocs);
-  if(nprocs>1&&procs_id>-1){
+    nprocs_=std::max(nworker_procs(), nserver_procs());
+  CHECK_LT(procs_id, nprocs_);
+  if(nprocs_>1&&procs_id>-1){
     std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
     std::string line;
-    while(std::getline(ifs, line)
-        &&endpoints_.size()<static_cast<size_t>(nprocs_)){
+    while(std::getline(ifs, line)&&
+        endpoints_.size()< static_cast<size_t>(nprocs_)){
       endpoints_.push_back(line);
     }
     CHECK_EQ(endpoints_.size(), nprocs_);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index 4ad17ce..ac3a6bb 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -133,11 +133,11 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){
   return msg;
 }
 
-Msg* Param::GenSyncMsg(bool copy, int v){
+Msg* Param::GenSyncMsg(int offset, int size){
   Msg* msg=new Msg();
   msg->set_type(kSyncRequest);
-  msg->set_target(id(), local_version());
-  msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  msg->set_trgt(-1, id(), local_version());
+  msg->add_frame(mutable_cpu_data(), data_->count()*sizeof(float));
   return msg;
 }
 
@@ -150,9 +150,10 @@ Msg* Param::HandlePutMsg(Msg** msg){
   proto_.set_learning_rate_multiplier(lr);
   proto_.set_weight_decay_multiplier(wc);
   vector<int> shape{size};
-  Setup(shape);
-  set_local_version((*msg)->target_second());
-  set_version((*msg)->target_second());
+  ParamProto proto;
+  Setup(proto, shape);
+  set_local_version((*msg)->trgt_third());
+  set_version((*msg)->trgt_third());
   if(ptr==nullptr){
     CHECK((*msg)->next_frame());
     CHECK_EQ(size* sizeof(float), (*msg)->frame_size());
@@ -205,7 +206,6 @@ Msg* Param::HandleSyncMsg(Msg** msg){
   return nullptr;
 }
 
-<<<<<<< HEAD
 int Param::ParseSyncResponseMsg(Msg** msg, int slice_idx){
   DeleteMsg(msg);
   return 1;


[2/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild The program is simply tested using two processes. TODO 1. read process endpoints from the zookeeper instead of hard-coding them. 2. split large parameters to avoid load-balance issue

Posted by wa...@apache.org.
SINGA-8 Implement distributed Hogwild
The program is simply tested using two processes.
TODO
1. read process endpoints from the zookeeper instead of hard-coding them.
2. split large parameters to avoid load-balance issue among server groups.
currently, server groups are assigned (almost) equal number of param objects.
but these objects may be quite different in terms of memory space.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f4370118
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f4370118
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f4370118

Branch: refs/heads/master
Commit: f4370118c91f688fdc8c84d0d590096f2e93586c
Parents: a019958
Author: wang wei <wa...@comp.nus.edu.sg>
Authored: Wed Jun 17 16:17:19 2015 +0800
Committer: wang wei <wa...@comp.nus.edu.sg>
Committed: Thu Jun 25 11:49:32 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/cluster-dist.conf |   8 +++
 examples/cifar10/hostfile          |  22 +-----
 include/communication/msg.h        |   2 +-
 include/communication/socket.h     |  11 ++-
 include/trainer/server.h           |  14 ++--
 include/trainer/trainer.h          |   5 +-
 include/utils/cluster.h            |  10 ++-
 include/utils/param.h              |   3 +-
 src/communication/socket.cc        |   8 ++-
 src/proto/cluster.proto            |   6 ++
 src/test/test_paramslicer.cc       |  47 +++++++++++++
 src/trainer/server.cc              | 121 ++++++++++++++++++++++++++------
 src/trainer/trainer.cc             |  62 +++++++++++++++-
 src/trainer/worker.cc              |   8 ++-
 src/utils/param.cc                 |  15 ++--
 15 files changed, 272 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/examples/cifar10/cluster-dist.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/cluster-dist.conf b/examples/cifar10/cluster-dist.conf
new file mode 100644
index 0000000..1a4e2c2
--- /dev/null
+++ b/examples/cifar10/cluster-dist.conf
@@ -0,0 +1,8 @@
+nworker_groups: 2
+nserver_groups: 2
+nservers_per_group: 1
+nworkers_per_group: 1
+nworkers_per_procs: 1
+workspace: "examples/cifar10/"
+hostfile: "examples/cifar10/hostfile"
+poll_time: 100

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/examples/cifar10/hostfile
----------------------------------------------------------------------
diff --git a/examples/cifar10/hostfile b/examples/cifar10/hostfile
index 83e06e5..eda7414 100644
--- a/examples/cifar10/hostfile
+++ b/examples/cifar10/hostfile
@@ -1,20 +1,2 @@
-awan-2-26-0
-awan-2-27-0
-awan-2-28-0
-awan-2-29-0
-awan-2-30-0
-awan-2-31-0
-awan-2-32-0
-awan-2-33-0
-awan-2-34-0
-awan-2-35-0
-awan-2-36-0
-awan-2-37-0
-awan-2-38-0
-awan-2-39-0
-awan-2-40-0
-awan-2-41-0
-awan-2-42-0
-awan-2-43-0
-awan-2-44-0
-awan-2-45-0
+localhost:9733
+localhost:9734

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index c3ef1c7..60a359a 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -23,6 +23,7 @@ class Msg {
     * @param second worker/server id within the group
     * @param flag 0 for server, 1 for worker, 2 for stub
     */
+<<<<<<< HEAD
   inline void set_src(int first, int second, int flag) {
     src_ = (first << kOff1) | (second << kOff2) | flag;
   }
@@ -78,7 +79,6 @@ class Msg {
   void ParseFromZmsg(zmsg_t* msg);
   zmsg_t* DumpToZmsg();
 #endif
-
  protected:
   static const unsigned int kOff1 = 16;
   static const unsigned int kOff2 = 4;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/communication/socket.h
----------------------------------------------------------------------
diff --git a/include/communication/socket.h b/include/communication/socket.h
index d1cb400..b98656e 100644
--- a/include/communication/socket.h
+++ b/include/communication/socket.h
@@ -19,10 +19,10 @@ class SocketInterface {
  public:
   virtual ~SocketInterface() {}
   /**
-    * Send a message to connected socket(s), non-blocking. The message 
-    * will be deallocated after sending, thus should not be used after 
+    * Send a message to connected socket(s), non-blocking. The message
+    * will be deallocated after sending, thus should not be used after
     * calling Send();
-    * 
+    *
     * @param msg The message to be sent
     * @return 1 for success queuing the message for sending, 0 for failure
     */
@@ -56,6 +56,11 @@ class Poller {
     */
   SocketInterface* Wait(int duration);
 
+  /**
+   * @return true if the poller is terminated due to process interupt
+   */
+  virtual bool Terminated()=0;
+
  protected:
 #ifdef USE_ZMQ
   zpoller_t *poller_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index b07741f..a8995fb 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -27,6 +27,12 @@ class Server{
   void Setup(const UpdaterProto& proto, shared_ptr<ServerShard> shard,
       const vector<int>& slice2group);
   void Run();
+  const int group_id() const {
+    return group_id_;
+  }
+  const int server_id() const {
+    return server_id_;
+  }
 
  protected:
 
@@ -50,24 +56,20 @@ class Server{
    * @return the original message or response message. If we don't want need to
    * acknowledge the put request, then return nullptr.
 	 */
-	virtual void HandlePut(shared_ptr<Param> param, Msg **msg);
+	virtual Msg* HandlePut(Msg **msg);
 
 	/**
    * TODO Process SYNC request.
 	 */
 	virtual Msg* HandleSyncRequest(shared_ptr<Param> param, Msg** msg);
 
-	/**
-   * TODO Process SYNC response.
-	virtual int HandleSyncResponse(shared_ptr<Param> param, Msg** msg);
-	 */
-
  protected:
   int thread_id_,group_id_, server_id_;
   shared_ptr<Dealer> dealer_;
   shared_ptr<Updater> updater_;
   shared_ptr<ServerShard> shard_;
   vector<int> slice2group_;
+  std::map<int, shared_ptr<Blob<float>>> last_data_;
 };
 } /* Server */
 #endif //INCLUDE_TRAINER_SERVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index ed93374..fbbfd0b 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -95,13 +95,14 @@ class Trainer{
   // point.
 
  protected:
-
   vector<shared_ptr<Server>> CreateServers(int nthread, const ModelProto& mproto,
       const vector<int> slices, vector<HandleContext*>* ctx);
   vector<shared_ptr<Worker>> CreateWorkers(int nthread,
       const ModelProto& mproto, vector<int> *slice_size);
 
-  void Run(int nworkers, int nservers);
+  void Run(const vector<shared_ptr<Worker>>& workers,
+      const vector<shared_ptr<Server>>& servers,
+      const std::map<int, shared_ptr<ParamShard>>& shards);
   /**
    * Register default implementations for all base classes used in the system,
    * e.g., the Updater, BaseMsg, etc.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 9648bfe..0eeb808 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -112,11 +112,15 @@ class Cluster {
   }
 
   /**
-   * bandwidth MB/s
-  float bandwidth() const {
+   * bandwidth Bytes/s
+   */
+  const int bandwidth() const {
     return cluster_.bandwidth();
   }
-   */
+
+  const int poll_time() const {
+    return cluster_.poll_time();
+  }
 
   shared_ptr<ClusterRuntime> runtime() const {
     return cluster_rt_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
index 897c97a..d449fba 100644
--- a/include/utils/param.h
+++ b/include/utils/param.h
@@ -71,6 +71,7 @@ class Param {
    */
   virtual Msg* HandleSyncMsg(Msg** msg);
 
+<<<<<<< HEAD
   /**
    * Server parses update request message.
    *
@@ -105,6 +106,7 @@ class Param {
    * @param shape
    */
   virtual void Setup(const ParamProto& proto, const std::vector<int>& shape);
+  virtual void Setup(const vector<int>& shape);
   /*
    * Fill the values according to initmethod, e.g., gaussian distribution
    *
@@ -238,7 +240,6 @@ class Param {
   ParamProto proto_;
   int local_version_;
 };
-
 }  // namespace singa
 
 #endif  // INCLUDE_UTILS_PARAM_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 5321724..38c0d79 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -19,9 +19,14 @@ SocketInterface* Poller::Wait(int timeout) {
   zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout));
   if (sock != nullptr)
     return zsock2Socket_[sock];
-  return nullptr;
+  else
+    return nullptr;
+}
+bool Poller::Terminated(){
+  return zpoller_terminated(poller_);
 }
 
+
 Dealer::Dealer() : Dealer(-1) {}
 
 Dealer::Dealer(int id) : id_(id) {
@@ -31,6 +36,7 @@ Dealer::Dealer(int id) : id_(id) {
   CHECK_NOTNULL(poller_);
 }
 
+<<<<<<< HEAD
 Dealer::~Dealer() {
   zsock_destroy(&dealer_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index 4f7e661..3317f2a 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -38,6 +38,12 @@ message ClusterProto {
   optional bool server_update = 40 [default = true];
   // share memory space between worker groups in one procs
   optional bool share_memory = 41 [default = true];
+
+  // bandwidth of ethernet, Bytes per second, default is 1 Gbps
+  optional int32 bandwidth=50 [default=134217728];
+  // poll time in milliseconds
+  optional int32 poll_time=51 [default =100];
+>>>>>>> SINGA-8 Implement distributed Hogwild
 }
 
 message ServerTopology {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/test/test_paramslicer.cc
----------------------------------------------------------------------
diff --git a/src/test/test_paramslicer.cc b/src/test/test_paramslicer.cc
new file mode 100644
index 0000000..bbff616
--- /dev/null
+++ b/src/test/test_paramslicer.cc
@@ -0,0 +1,47 @@
+#include "utils/param.h"
+#include "gtest/gtest.h"
+
+
+using namespace singa;
+
+const int param_size[]={2400,32,25600,32, 51200,64,57600,10};
+
+class ParamSlicerTest : public ::testing::Test {
+  public:
+    ParamSlicerTest() {
+      ParamProto proto;
+      int nparams=sizeof(param_size)/sizeof(int);
+      for(int i=0;i<nparams;i++){
+        vector<int> shape{param_size[i]};
+        auto param=std::make_shared<Param>();
+        param->Setup(proto, shape);
+        param->set_id(i);
+        params.push_back(param);
+      }
+    }
+  protected:
+    vector<shared_ptr<Param>> params;
+};
+
+// all params are stored in one box, no need to split
+TEST_F(ParamSlicerTest, OneBox){
+  int nparams=sizeof(param_size)/sizeof(int);
+  ParamSlicer slicer;
+  int num=1;
+  auto slices=slicer.Slice(num, params);
+  ASSERT_EQ(slices.size(),nparams);
+  ASSERT_EQ(slicer.Get(1).size(),1);
+  ASSERT_EQ(slicer.Get(2).size(),1);
+  ASSERT_EQ(slicer.Get(nparams-1).back(), slices.size()-1);
+}
+
+// there are multiple boxes
+TEST_F(ParamSlicerTest, MultipleBox){
+  int nparams=sizeof(param_size)/sizeof(int);
+  ParamSlicer slicer;
+  int num=4;
+  auto slices=slicer.Slice(num, params);
+  ASSERT_EQ(slicer.Get(1).size(),1);
+  ASSERT_EQ(slicer.Get(3).size(),1);
+  ASSERT_EQ(slicer.Get(nparams-1).back(), slices.size()-1);
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 04f6040..5185c51 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -1,13 +1,14 @@
 #include <list>
 #include <tuple>
 #include <queue>
+#include "mshadow/tensor.h"
 #include "trainer/server.h"
 #include "utils/param.h"
 #include "utils/singleton.h"
 #include "utils/factory.h"
 #include "utils/cluster.h"
 
-
+using namespace mshadow;
 namespace singa {
 Server::Server(int thread_id,int group_id, int server_id):
   thread_id_(thread_id),group_id_(group_id), server_id_(server_id){}
@@ -23,21 +24,23 @@ void Server::Setup(const UpdaterProto& proto,
 }
 
 void Server::Run(){
+  LOG(INFO)<<"Server (group_id= "<<group_id_<<", id="<<server_id_<<") starts";
   dealer_=std::make_shared<Dealer>(2*thread_id_);
   dealer_->Connect(kInprocRouterEndpoint);
-
+  auto cluster=Cluster::Get();
   Msg* ping=new Msg();
   ping->set_src(group_id_, server_id_, kServer);
   ping->set_dst(-1,-1,kStub);
   ping->add_frame("PING", 4);
   ping->set_type(kConnect);
   dealer_->Send(&ping);
+  int syncEntry=0;
 	//start recv loop and process requests
   while (true){
     Msg* msg=dealer_->Receive();
     if (msg==nullptr)
       break;
-    Msg* response=nullptr;
+    Msg* response=nullptr, *sync=nullptr;
     int type=msg->type();
     if (type== kStop){
       msg->set_src(group_id_, server_id_, kServer);
@@ -48,26 +51,47 @@ void Server::Run(){
       // TODO remove receiving pong msg
       string pong((char*)msg->frame_data(), msg->frame_size());
       CHECK_STREQ("PONG", pong.c_str());
-      delete msg;
+      DeleteMsg(&msg);
     }else if(type==kPut){
-      int pid=msg->trgt_second();
-      shared_ptr<Param> param=nullptr;
-      if(shard_->find(pid)!=shard_->end()){
-        LOG(ERROR)<<"Param ("<<pid<<") is put more than once";
-        param=shard_->at(pid);
-      }else{
-        param=shared_ptr<Param>(Singleton<Factory<Param>>::Instance()
-            ->Create("Param"));
-        param->set_id(pid);
-        (*shard_)[pid]=param;
-      }
-      HandlePut(param, &msg);
+      response = HandlePut(&msg);
     }else{
       int pid=msg->trgt_second();
       if(shard_->find(pid)==shard_->end()){
         // delay the processing by re-queue the msg.
         response=msg;
         DLOG(ERROR)<<"Requeue msg";
+    }else if(type==kSyncReminder){
+      DeleteMsg(&msg);
+      unsigned nchecks=0, nparams=shard_->size();
+      while(nchecks<nparams
+          &&group_locator_->at(shard_->at(syncEntry))!=group_id_){
+        syncEntry=(syncEntry+1)%nparams;
+        nchecks++;
+      }
+      if(nchecks==nparams) continue;
+      auto param=shard_->at(syncEntry);
+      if(param->local_version()!=param->version()){
+        sync=param->GenSyncMsg(true);
+        for(int i=0;i<cluster->nserver_groups();i++){
+          if(i!=group_id_) {
+            Msg* tmp=sync;
+            if(i<cluster->nserver_groups()-1)
+              tmp= new Msg(sync);
+            tmp->set_dst(i, server_locator_->at(param), kServer);
+            tmp->set_src(group_id_, server_id_, kServer);
+            dealer_->Send(&tmp);
+            param->set_version(param->local_version());
+            //DLOG(ERROR)<<"sync";
+          }
+        }
+      }
+    }else {
+      int pid=msg->target_first();
+      if(shard_->find(pid)==shard_->end()){
+        // delay the processing by re-queue the msg.
+        response=msg;
+        LOG(ERROR)<<"Requeue";
+>>>>>>> SINGA-8 Implement distributed Hogwild
       } else{
         auto param=shard_->at(pid);
         switch (type){
@@ -80,20 +104,42 @@ void Server::Run(){
           case kSyncRequest:
             response = HandleSyncRequest(param, &msg);
             break;
-        }
-        if (response!=nullptr){
-          dealer_->Send(&response);
+          default:
+            LOG(ERROR)<<"Unknown message type "<<type;
+            break;
         }
       }
     }
+    if (response!=nullptr)
+      dealer_->Send(&response);
   }
+  LOG(INFO)<<"Server (group_id= "<<group_id_<<", id="<<server_id_<<") stops";
 }
 
-void Server::HandlePut(shared_ptr<Param> param, Msg **msg){
+Msg* Server::HandlePut(Msg **msg){
   int version=(*msg)->trgt_third();
-  param->HandlePutMsg(msg);
+  int pid=(*msg)->target_first();
+  shared_ptr<Param> param=nullptr;
+  if(shard_->find(pid)!=shard_->end()){
+    LOG(ERROR)<<"Param ("<<pid<<") is put more than once";
+    param=shard_->at(pid);
+  }else{
+    auto factory=Singleton<Factory<Param>>::Instance();
+    param=shared_ptr<Param>(factory ->Create("Param"));
+    param->set_id(pid);
+    (*shard_)[pid]=param;
+  }
+  auto response=param->HandlePutMsg(msg);
   // must set version after HandlePutMsg which allocates the memory
   param->set_version(version);
+  if(Cluster::Get()->nserver_groups()>1 &&
+      group_locator_->at(param)!=group_id_){
+    last_data_[pid]=std::make_shared<Blob<float>>();
+    last_data_[pid]->ReshapeLike(param->data());
+    last_data_[pid]->CopyFrom(param->data());
+  }
+  LOG(INFO)<<"Server put param "<<pid<<" size="<<param->size()<<" Bytes";
+  return response;
 }
 
 Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){
@@ -124,7 +170,36 @@ Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
 }
 
 Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
-  return param->HandleSyncMsg(msg);
+  Msg* response=nullptr;
+  auto shape=Shape1(param->size());
+  CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float));
+  Tensor<cpu, 1> tmp(static_cast<float*>((*msg)->frame_data()), shape);
+  Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+  if(group_locator_->at(param)==group_id_){
+    cur+=tmp;
+    param->set_local_version(param->local_version()+1);
+  }else{
+    TensorContainer<cpu, 1> diff(shape);
+    Tensor<cpu, 1> prev(last_data_[param->id()]->mutable_cpu_data(), shape);
+    diff=cur-prev;
+    (*msg)->next_frame();
+    int bandwidth;
+    sscanf(static_cast<char*>((*msg)->frame_data()), "%d", &bandwidth);
+    if(bandwidth>0){
+      response=new Msg();
+      response->set_type(kSyncRequest);
+      response->set_target(param->id(), param->version());
+      response->add_frame(diff.dptr, param->size()*sizeof(float));
+      (*msg)->SwapAddr();
+      response->SetAddr(*msg);
+      prev=diff+tmp;
+      Copy(cur, prev);
+    }else{
+      Copy(prev, tmp);
+      cur=tmp+diff;
+    }
+  }
+  DeleteMsg(msg);
+  return response;
 }
-
 } /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 6c08a3a..bdc1416 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -2,12 +2,16 @@
 #include <vector>
 #include <map>
 #include <queue>
+#include <chrono>
 #include <glog/logging.h>
 #include "proto/common.pb.h"
 #include "trainer/trainer.h"
 #include "mshadow/tensor.h"
 using std::vector;
 using std::map;
+using namespace std::chrono;
+
+typedef std::chrono::milliseconds TimeT;
 
 namespace singa {
 
@@ -21,14 +25,17 @@ void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){
 }
 
 void HandleWorkerFinish(void * ctx){
+  /*
   HandleContext* hctx=static_cast<HandleContext*> (ctx);
   Msg* msg=new Msg();
   msg->set_src(-1,-1, kRuntime);
   msg->set_dst(hctx->group_id, hctx->id, kServer);
   msg->set_type(kStop);
   hctx->dealer->Send(&msg);
+  */
 }
 
+<<<<<<< HEAD
 const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
     const vector<shared_ptr<Param>>& params){
   std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
@@ -276,20 +283,51 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto,
     threads.push_back(std::thread(&Server::Run,server.get()));
   for(auto worker: workers)
     threads.push_back(std::thread(&Worker::Run,worker.get()));
-  Run(workers.size(), servers.size());
+  Run(workers, servers, shards);
   for(auto& thread: threads)
     thread.join();
   for(auto x: ctx)
     delete x;
 }
 
-void Trainer::Run(int nworkers, int nservers){
+inline int bandwidth(int bytes, system_clock::time_point start){
+  auto now=system_clock::now();
+  auto duration=duration_cast<TimeT> (now - start);
+  return static_cast<int>(bytes*1000.f/duration.count());
+}
+void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
+    const vector<shared_ptr<Server>>& servers,
+    const std::map<int, shared_ptr<Trainer::ParamShard>>& shards){
   auto cluster=Cluster::Get();
   procs_id_=cluster->procs_id();
+  LOG(INFO)<<"Stub in process "<<procs_id_<<" starts";
   map<int, shared_ptr<Dealer>> interprocs_dealers;
   std::queue<Msg*> msg_queue;
   bool stop=false;
+  auto start=std::chrono::system_clock::now();
+  float amount=0.f;
+  Poller poll;
+  poll.Add(router_.get());
+  int sync_server=0, nworkers=workers.size(), nservers=servers.size();
   while(!stop){
+    Socket *sock=poll.Wait(cluster->poll_time());
+    if(poll.Terminated()){
+      LOG(ERROR)<<"Connection broken!";
+      exit(0);
+    }else if(sock==nullptr){
+      if(cluster->nserver_groups()>1&&
+          bandwidth(amount, start)<cluster->bandwidth()){
+        Msg* msg=new Msg();
+        msg->set_src(-1,-1, kStub);
+        msg->set_dst(servers[sync_server]->group_id(),
+            servers[sync_server]->server_id(), kServer);
+        msg->set_type(kSyncReminder);
+        sync_server=(sync_server+1)%servers.size();
+        router_->Send(&msg);
+        //LOG(ERROR)<<"Reminder";
+      }
+      continue;
+    }
     Msg* msg=router_->Receive();
     if(msg==nullptr){
       LOG(ERROR)<<"Connection broken!";
@@ -360,6 +398,7 @@ void Trainer::Run(int nworkers, int nservers){
                 msg_queue.push(x);
               break;
             default:
+              LOG(ERROR)<<"Unknow message type:"<<type;
               break;
           }
         }else{
@@ -374,12 +413,30 @@ void Trainer::Run(int nworkers, int nservers){
               msg->dst_second(), msg->dst_flag());
         }
         if(dst_procs_id!=procs_id_){
+          // forward to other procs
+          if (interprocs_dealers.find(dst_procs_id)==interprocs_dealers.end()){
+            auto dealer=make_shared<Dealer>();
+            interprocs_dealers[dst_procs_id]=dealer;
+            dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id));
+          }
+          if(bandwidth(amount, start) <=cluster->bandwidth()){
+            start=std::chrono::system_clock::now();
+            amount=0;
+          }
+          amount+=msg->size();
+          interprocs_dealers[dst_procs_id]->Send(&msg);
         }else{
+          if(type==kSyncRequest){
+            char buf[32];
+            sprintf(buf, "%d", cluster->bandwidth()-bandwidth(amount, start));
+            msg->add_frame(buf, strlen(buf));
+          }
           router_->Send(&msg);
         }
       }
     }
   }
+  LOG(INFO)<<"Stub in process "<<procs_id_<<" stops";
 }
 Msg* Trainer::HandleConnect(Msg** msg){
   string ping((char*)(*msg)->frame_data(), (*msg)->frame_size());
@@ -394,7 +451,6 @@ Msg* Trainer::HandleConnect(Msg** msg){
   *msg=NULL;
   return reply;
 }
-
 const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
   Msg* msgg=*msg;
   vector<Msg*> replies;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 788e77c..37acb14 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -47,6 +47,7 @@ void Worker::ConnectStub(shared_ptr<Dealer> dealer, EntityType type){
 }
 
 void Worker::Run(){
+  LOG(INFO)<<"Worker (group_id= "<<group_id_<<", id="<<worker_id_<<") starts";
   dealer_=make_shared<Dealer>(2*thread_id_);
   ConnectStub(dealer_, kWorkerParam);
   for(auto layer: train_net_->layers())
@@ -61,8 +62,10 @@ void Worker::Run(){
   for(auto layer: train_net_->layers()){
     if(layer->partitionid()==worker_id_)
       for(auto param: layer->GetParams()){
+        // only owners fill the memory of parameter values.
+        // others share the memory with owners hence do not need to put/get.
         if(param->owner() == param->id()){
-          if(group_id_==0)
+          if(group_id_%Cluster::Get()->nworker_groups_per_server_group()==0)
             param->InitValues(0);
           else
             Get(param, modelproto_.warmup_steps());
@@ -70,7 +73,7 @@ void Worker::Run(){
       }
   }
   Metric perf;
-  if(group_id_==0){
+  if(group_id_%Cluster::Get()->nworker_groups_per_server_group()==0){
     for(step_=0;step_<modelproto_.warmup_steps();step_++)
       RunOneBatch(step_, &perf);
     for(auto layer: train_net_->layers()){
@@ -86,6 +89,7 @@ void Worker::Run(){
   }
 
   Stop();
+  LOG(INFO)<<"Worker (group_id= "<<group_id_<<", id="<<worker_id_<<") stops";
 }
 
 void Worker::Stop(){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f4370118/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index deff6f4..4ad17ce 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -133,8 +133,12 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){
   return msg;
 }
 
-Msg* Param::GenSyncMsg(){
-  return nullptr;
+Msg* Param::GenSyncMsg(bool copy, int v){
+  Msg* msg=new Msg();
+  msg->set_type(kSyncRequest);
+  msg->set_target(id(), local_version());
+  msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  return msg;
 }
 
 Msg* Param::HandlePutMsg(Msg** msg){
@@ -146,9 +150,9 @@ Msg* Param::HandlePutMsg(Msg** msg){
   proto_.set_learning_rate_multiplier(lr);
   proto_.set_weight_decay_multiplier(wc);
   vector<int> shape{size};
-  grad_.Reshape(shape);
-  history_.Reshape(shape);
-  data_=std::make_shared<Blob<float>>(shape);
+  Setup(shape);
+  set_local_version((*msg)->target_second());
+  set_version((*msg)->target_second());
   if(ptr==nullptr){
     CHECK((*msg)->next_frame());
     CHECK_EQ(size* sizeof(float), (*msg)->frame_size());
@@ -201,6 +205,7 @@ Msg* Param::HandleSyncMsg(Msg** msg){
   return nullptr;
 }
 
+<<<<<<< HEAD
 int Param::ParseSyncResponseMsg(Msg** msg, int slice_idx){
   DeleteMsg(msg);
   return 1;