You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/06/03 07:48:09 UTC

[04/60] incubator-singa git commit: SINGA-163 - Reorganize the project folder layout

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/updater.cc
----------------------------------------------------------------------
diff --git a/src/utils/updater.cc b/src/utils/updater.cc
deleted file mode 100644
index a2180d3..0000000
--- a/src/utils/updater.cc
+++ /dev/null
@@ -1,284 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/updater.h"
-
-#include "mshadow/cxxnet_op.h"
-#include "mshadow/tensor.h"
-#include "singa/utils/singleton.h"
-#include "singa/utils/factory.h"
-
-namespace singa {
-
-using mshadow::cpu;
-using mshadow::expr::F;
-using mshadow::op::sqrtop;
-using mshadow::op::square;
-using mshadow::Shape;
-using mshadow::Shape1;
-using mshadow::Tensor;
-using mshadow::TensorContainer;
-
-LRGenerator* LRGenerator::Create(const LRGenProto& proto) {
-  auto factory = Singleton<Factory<LRGenerator>>::Instance();
-  LRGenerator* gen = nullptr;
-  if (proto.has_user_type())
-    gen = factory->Create(proto.user_type());
-  else
-    gen = factory->Create(proto.type());
-  gen->Init(proto);
-  return gen;
-}
-
-float FixedStepLRGen::Get(int step) {
-  if (last_idx_ < proto_.fixedstep_conf().step_size() - 1
-      && step >= proto_.fixedstep_conf().step(last_idx_ + 1)) {
-      last_idx_++;
-    }
-  return proto_.fixedstep_conf().step_lr(last_idx_);
-}
-
-float StepLRGen::Get(int step) {
-  // do not cast int to float
-  int freq = proto_.step_conf().change_freq();
-  float lr = proto_.base_lr() * pow(proto_.step_conf().gamma(), step / freq);
-  // LOG_IF(INFO, step % freq == 0) << "Update learning rate to " << lr
-  //   << " @ step " << step;
-  return lr;
-}
-
-float LinearLRGen::Get(int step) {
-  int freq = proto_.linear_conf().change_freq();
-  float r = step * 1.0 / freq;
-  return (1.0 - r) * proto_.base_lr() + r * proto_.linear_conf().final_lr();
-}
-
-float ExpLRGen::Get(int step) {
-  int freq = proto_.exponential_conf().change_freq();
-  return proto_.base_lr() / pow(2, step * 1. / freq);
-}
-
-float InvLRGen::Get(int step) {
-  return proto_.base_lr() * pow(1.f + proto_.inverse_conf().gamma() * step,
-           - proto_.inverse_conf().pow());
-}
-
-float InvTLRGen::Get(int step) {
-  return proto_.base_lr() / (1 + step * 1. / proto_.inverset_conf().final_lr());
-}
-
-Updater* Updater::Create(const UpdaterProto& proto) {
-  auto factory = Singleton<Factory<Updater>>::Instance();
-  Updater* updater = nullptr;
-  if (proto.has_user_type())
-    updater = factory->Create(proto.user_type());
-  else
-    updater = factory->Create(proto.type());
-  updater->Init(proto);
-  return updater;
-}
-
-/**************** added for Python Binding ***************************/
-Updater* Updater::CreateUpdater(const string str) {
-  UpdaterProto conf;
-  conf.ParseFromString(str);
-  return Updater::Create(conf);
-}
-/***********************Python Binding end**************************/
-
-
-/***********************SGD with momentum******************************/
-void Updater::Init(const UpdaterProto& proto) {
-  momentum_ = proto.momentum();
-  weight_decay_ = proto.weight_decay();
-  lr_gen_ = LRGenerator::Create(proto.learning_rate());
-  clip_low_ = proto.clip_low();
-  clip_high_ = proto.clip_high();
-}
-
-void Updater::Clip(const float low, const float high, Param* param) {
-  Blob<float>* grad = param->mutable_grad();
-  float* ptr = grad->mutable_cpu_data();
-  for (int i = 0; i < grad->count(); i++) {
-    if (ptr[i] > high)
-      ptr[i] = high;
-    else if (ptr[i] < low)
-      ptr[i] = low;
-  }
-}
-
-void SGDUpdater::Update(int step, Param* param, float grad_scale) {
-  if (clip_high_ > clip_low_)
-    Clip(clip_low_, clip_high_, param);
-  Shape<1> s = Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  float lr = lr_gen_->Get(step) * param->lr_scale();
-  float wd = weight_decay_ * param->wd_scale();
-  grad *= grad_scale;
-  if (wd > 0)  // L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  if (momentum_ > 0) {
-    Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-    history = history * momentum_ - lr * grad;
-    data += history;
-  } else {
-    grad *= -lr;
-    data += grad;
-  }
-}
-
-/***********************Nesterov******************************/
-void NesterovUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
-    Clip(clip_low_, clip_high_, param);
-
-  Shape<1> s = Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  TensorContainer<cpu, 1> tmp(s);
-  float lr = lr_gen_->Get(step)*param->lr_scale();
-  float wd = weight_decay_*param->wd_scale();
-  grad *= grad_scale;
-  if (wd > 0)  // L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  Copy(tmp, history);
-  history = history * momentum_ + lr * grad;
-  tmp = history * (1 + momentum_) - tmp * momentum_;
-  data -= tmp;
-}
-/***********************AdaGrad******************************/
-void AdaGradUpdater::Update(int step, Param* param, float grad_scale) {
-  if (clip_high_ > clip_low_)
-    Clip(clip_low_, clip_high_, param);
-  Shape<1> s = Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  float lr = lr_gen_->Get(step)*param->lr_scale();
-  float wd = weight_decay_*param->wd_scale();
-  grad *= grad_scale;
-  if (wd > 0)  //  L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  history += F<square>(grad);
-  data -= lr * grad / (F<sqrtop>(history, proto_.delta()));
-}
-
-/***********************RMSProp******************************/
-void RMSPropUpdater::Init(const UpdaterProto& proto) {
-  Updater::Init(proto);
-  rho_ = proto.rmsprop_conf().rho();
-  delta_ = proto.delta();
-}
-
-void RMSPropUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
-    Clip(clip_low_, clip_high_, param);
-
-  Shape<1> s=Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  float lr = lr_gen_->Get(step) * param->lr_scale();
-  float wd = weight_decay_ * param->wd_scale();
-  grad *= grad_scale;
-  if (wd > 0)  //  L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  history = history * rho_ + (1 - rho_) * F<square>(grad);
-  data -= lr * grad / F<sqrtop>(history, delta_);
-}
-/***********************AdaDelta******************************/
-void AdaDeltaUpdater::Init(const UpdaterProto& proto){
-  Updater::Init(proto);
-  delta_ = proto.delta();
-  rho_=proto.adadelta_conf().rho();
-}
-
-void AdaDeltaUpdater::Update(int step, Param* param, float grad_scale){
-  Shape<1> s=Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
-  TensorContainer<cpu, 1> tmp(s);
-  float wd = weight_decay_*param->wd_scale();
-  float lr = lr_gen_->Get(step) * param->lr_scale();
-  grad *= grad_scale;
-  if (wd > 0)  //  L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  history = history * rho_ + (1 - rho_) * F<op::square>(grad);
-  tmp = grad * F<op::sqrtop>(update, delta_) / F<op::sqrtop>(history, delta_);
-  update = rho_ * update + (1 - rho_) * F<op::square>(tmp);
-  data -= lr * tmp;
-}
-
-/***********************Adam******************************/
-void AdamUpdater::Init(const UpdaterProto &proto) {
-  Updater::Init(proto);
-  beta1_=proto.adam_conf().beta1();
-  beta2_=proto.adam_conf().beta2();
-  delta_ = proto.delta();
-}
-
-void AdamUpdater::Update(int step, Param* param, float grad_scale) {
-  Shape<1> s=Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
-  float wd = weight_decay_*param->wd_scale();
-  float lr = lr_gen_->Get(step) * param->lr_scale();
-  grad *= grad_scale;
-  if (wd > 0)  //  L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  history = history * beta1_ + (1 - beta1_) * grad;
-  update = update * beta2_ + (1 - beta2_) * F<op::square>(grad);
-  data -= lr * history / F<op::sqrtop>(update, delta_);
-}
-
-/***********************AdamMax******************************/
-void AdamMaxUpdater::Init(const UpdaterProto &proto) {
-  Updater::Init(proto);
-  beta1_=proto.adammax_conf().beta1();
-  beta2_=proto.adammax_conf().beta2();
-  delta_=proto.delta();
-}
-
-void AdamMaxUpdater::Update(int step, Param* param, float grad_scale) {
-  Shape<1> s=Shape1(param->size());
-  Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
-  Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
-  Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
-  Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
-  float wd = weight_decay_*param->wd_scale();
-  float lr = lr_gen_->Get(step) * param->lr_scale();
-  grad *= grad_scale;
-  if (wd > 0)  //  L2 regularization, should be done after timing grad_scale
-    grad += data * wd;
-  history = history * beta1_ + (1 - beta1_) * grad;
-  update = update * beta2_;
-  grad = F<op::abs>(grad);
-  update = F<op::max>(update, grad) + delta_;
-  data -= lr * history / update;
-}
-
-}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/zk_service.cc
----------------------------------------------------------------------
diff --git a/src/utils/zk_service.cc b/src/utils/zk_service.cc
deleted file mode 100644
index 352f6f7..0000000
--- a/src/utils/zk_service.cc
+++ /dev/null
@@ -1,326 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/zk_service.h"
-
-#include <glog/logging.h>
-#include <algorithm>
-
-using std::string;
-using std::to_string;
-using std::vector;
-
-namespace singa {
-
-void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
-                               const char *path, void *watcherCtx) {
-  // check if already callback
-  RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
-  if (cb->fn == nullptr) return;
-  if (type == ZOO_CHILD_EVENT) {
-    struct String_vector child;
-    // check the child list and put another watcher
-    int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
-    if (ret == ZOK) {
-      if (child.count == 0) {
-        LOG(INFO) << "child.count = 0 in path: " << path;
-        // all workers leave, we do callback now
-        (*cb->fn)(cb->ctx);
-        cb->fn = nullptr;
-      }
-    } else {
-      LOG(FATAL) << "Unhandled ZK error code: " << ret
-                 << " (zoo_wget_children " << path << ")";
-    }
-  } else {
-    LOG(FATAL) << "Unhandled callback type code: "<< type;
-  }
-}
-
-ZKService::~ZKService() {
-  // close zookeeper handler
-  zookeeper_close(zkhandle_);
-}
-
-char zk_cxt[] = "ZKClusterRT";
-
-bool ZKService::Init(const string& host, int timeout) {
-  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
-  zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
-                             static_cast<void *>(zk_cxt), 0);
-  if (zkhandle_ == NULL) {
-    LOG(ERROR) << "Error when connecting to zookeeper servers...";
-    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
-    LOG(ERROR) << host.c_str();
-    return false;
-  }
-
-  return true;
-}
-
-bool ZKService::CreateNode(const char* path, const char* val, int flag,
-                               char* output) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  char buf[kZKBufSize];
-  int ret = 0;
-  // send the zk request
-  for (int i = 0; i < kNumRetry; ++i) {
-    ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
-                     &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
-    if (ret == ZNONODE) {
-      LOG(WARNING) << "zookeeper parent node of " << path
-                  << " not exist, retry later";
-    } else if (ret == ZCONNECTIONLOSS) {
-      LOG(WARNING) << "zookeeper disconnected, retry later";
-    } else {
-      break;
-    }
-    sleep(kSleepSec);
-  }
-  // copy the node name to output
-  if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
-    snprintf(output, kZKBufSize, "%s", buf);
-    // use snprintf instead of strcpy
-    // strcpy(output, buf);
-  }
-  if (ret == ZOK) {
-    LOG(INFO) << "created zookeeper node " << buf
-              << " (" << (val == nullptr ? "NULL" : val) << ")";
-    return true;
-  } else if (ret == ZNODEEXISTS) {
-    LOG(WARNING) << "zookeeper node " << path << " already exists";
-    return true;
-  } else if (ret == ZCONNECTIONLOSS) {
-    LOG(ERROR) << "Cannot connect to zookeeper, "
-               << "please ensure it is running properly...\n"
-               << "If want to use zookeeper in our thirdparty folder, "
-               << "you can start it by:\n"
-               << "$ ./bin/zk-service.sh start";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_create " << path << ")";
-  return false;
-}
-
-bool ZKService::DeleteNode(const char* path) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  int ret = zoo_delete(zkhandle_, path, -1);
-  if (ret == ZOK) {
-    LOG(INFO) << "deleted zookeeper node " << path;
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_delete " << path << ")";
-  return false;
-}
-
-bool ZKService::Exist(const char* path) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct Stat stat;
-  int ret = zoo_exists(zkhandle_, path, 0, &stat);
-  if (ret == ZOK) return true;
-  else if (ret == ZNONODE) return false;
-  LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
-  return false;
-}
-
-bool ZKService::UpdateNode(const char* path, const char* val) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  // set version = -1, do not check content version
-  int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
-  if (ret == ZOK) {
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(ERROR) << "zk node " << path << " does not exist";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get " << path << ")";
-  return false;
-}
-
-bool ZKService::GetNode(const char* path, char* output) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct Stat stat;
-  int val_len = kZKBufSize;
-  int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
-  if (ret == ZOK) {
-    output[val_len] = '\0';
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(ERROR) << "zk node " << path << " does not exist";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get " << path << ")";
-  return false;
-}
-
-bool ZKService::GetChild(const char* path, vector<string>* vt) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct String_vector child;
-  int ret = zoo_get_children(zkhandle_, path, 0, &child);
-  if (ret == ZOK) {
-    vt->clear();
-    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get_children " << path << ")";
-  return false;
-}
-
-bool ZKService::WGetChild(const char* path, vector<string>* vt,
-                            RTCallback *cb) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct String_vector child;
-  int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
-  if (ret == ZOK) {
-    vt->clear();
-    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get_children " << path << ")";
-  return false;
-}
-
-
-void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
-                                const char *path, void *watcherCtx) {
-  if (type == ZOO_SESSION_EVENT) {
-    if (state == ZOO_CONNECTED_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
-    else if (state == ZOO_EXPIRED_SESSION_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
-  }
-}
-
-ZKClusterRT::ZKClusterRT(const string& host, int job_id) {
-  host_ = host;
-  workspace_ = GetZKJobWorkspace(job_id);
-  group_path_ = workspace_ + kZKPathJobGroup;
-  proc_path_ = workspace_ + kZKPathJobProc;
-  proc_lock_path_ = workspace_ + kZKPathJobPLock;
-}
-
-ZKClusterRT::~ZKClusterRT() {
-  // release callback vector
-  for (RTCallback* p : cb_vec_) {
-    delete p;
-  }
-}
-
-bool ZKClusterRT::Init() {
-  if (!zk_.Init(host_, timeout_)) return false;
-  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
-    return false;
-  return true;
-}
-
-int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
-  char buf[kZKBufSize];
-  string lock = proc_lock_path_ + "/lock-";
-  if (!zk_.CreateNode(lock.c_str(), nullptr,
-                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
-    return -1;
-  }
-  // get all children in lock folder
-  vector<string> vt;
-  if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
-    return -1;
-  }
-  // find own position among all locks
-  int id = -1;
-  std::sort(vt.begin(), vt.end());
-  for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
-    if (proc_lock_path_+"/"+vt[i] == buf) {
-      id = i;
-      break;
-    }
-  }
-  if (id == -1) {
-    LOG(ERROR) << "cannot find own node " << buf;
-    return -1;
-  }
-  // create a new node in proc path
-  string path = proc_path_ + "/proc-" + to_string(id);
-  string content = host_addr + "|" + to_string(pid);
-  if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
-                      nullptr)) {
-    return -1;
-  }
-  return id;
-}
-
-std::string ZKClusterRT::GetProcHost(int proc_id) {
-  char val[kZKBufSize];
-  // construct file name
-  string path = proc_path_ + "/proc-" + to_string(proc_id);
-  if (!zk_.GetNode(path.c_str(), val)) return "";
-  int len = strlen(val) - 1;
-  while (len && val[len] != '|') --len;
-  CHECK(len);
-  val[len] = '\0';
-  return string(val);
-}
-
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
-  CHECK_NOTNULL(fn);
-  string path = groupPath(gid);
-  // create zk node
-  if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
-  vector<string> child;
-  // store the callback function and context for later usage
-  RTCallback *cb = new RTCallback;
-  cb->fn = fn;
-  cb->ctx = ctx;
-  cb_vec_.push_back(cb);
-  // start to watch on the zk node, does not care about the first return value
-  return zk_.WGetChild(path.c_str(), &child, cb);
-}
-
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  // try to create an ephemeral node under server group path
-  return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
-}
-
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  return zk_.DeleteNode(path.c_str());
-}
-
-}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
deleted file mode 100644
index e92d780..0000000
--- a/src/worker.cc
+++ /dev/null
@@ -1,545 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/worker.h"
-
-#include <glog/logging.h>
-#include <chrono>
-#include <thread>
-#include <typeinfo>
-#include "singa/utils/cluster.h"
-#include "singa/utils/factory.h"
-#include "singa/utils/singleton.h"
-#include "singa/utils/context.h"
-#include "singa/utils/math_blob.h"
-
-namespace singa {
-
-using std::string;
-
-Worker* Worker::CreateWorker(const string str) {
-  AlgProto alg_proto;
-  alg_proto.ParseFromString(str);
-  return Worker::Create(alg_proto);
-}
-
-Worker* Worker::Create(const AlgProto& conf) {
-  auto factory = Singleton<Factory<singa::Worker>>::Instance();
-  Worker* worker = nullptr;
-  if (conf.has_user_alg())
-    worker = factory->Create(conf.user_alg());
-  else
-    worker = factory->Create(conf.alg());
-  return worker;
-}
-
-void Worker::Setup(int grp_id, int id, const JobProto& conf,
-    NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net) {
-  grp_id_ = grp_id;
-  id_ = id;
-  job_conf_ = conf;
-  train_net_ = train_net;
-  val_net_ = val_net;
-  test_net_ = test_net;
-  InitSockets(train_net);
-}
-
-Worker::~Worker() {
-  if (dealer_) delete dealer_;
-  if (bridge_dealer_) delete bridge_dealer_;
-}
-
-void Worker::Run() {
-  // setup gpu device
-  auto context = Singleton<Context>::Instance();
-  // TODO(wangwei) -2 for uninitial device; -1 for CPU; >=0 for GPU now.
-  int device = -2;
-  while (device == -2) {
-    device = context->device_id(std::this_thread::get_id());
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-  }
-  LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") "
-    << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU");
-  if (device >= 0)
-    context->ActivateDevice(device);
-
-  auto cluster = Cluster::Get();
-  int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
-  CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
-  step_ = job_conf_.step();
-  InitSockets(train_net_);
-  InitNetParams(job_conf_, train_net_);
-  while (!StopNow(step_)) {
-    if (ValidateNow(step_) && val_net_ != nullptr) {
-      CollectAll(step_, train_net_);
-      LOG(ERROR) << "Validation @ step " + std::to_string(step_);
-      Test(job_conf_.validate_steps(), kVal, val_net_);
-    }
-    if (TestNow(step_) && test_net_ != nullptr) {
-      CollectAll(step_, train_net_);
-      LOG(ERROR) << "Test @ step " + std::to_string(step_);
-      Test(job_conf_.test_steps(), kTest, test_net_);
-    }
-    if (CheckpointNow(step_) && grp_id_ == 0) {
-      CollectAll(step_, train_net_);
-      Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
-      job_conf_.set_step(step_);
-    }
-    TrainOneBatch(step_, train_net_);
-    if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0) {
-      Display(kTrain | kForward | kBackward,
-          "Train @ step " + std::to_string(step_), train_net_);
-    }
-    step_++;
-  }
-
-  // save the model
-  if (grp_id_ == 0)
-    Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
-  // clean up
-  cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
-  // notify the stub on worker stop
-  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
-  msg->set_type(kStop);
-  dealer_->Send(&msg);  // use param dealer to send the stop msg
-  LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
-}
-
-void Worker::Test(int steps, Phase phase, NeuralNet* net) {
-  for (int step = 0; step < steps; step++)
-    TestOneBatch(step, phase, net);
-  Display(phase, " ", net);
-}
-
-void Worker::InitSockets(const NeuralNet* net) {
-  dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerParam));
-  for (auto layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      if (typeid(*layer) == typeid(BridgeDstLayer)
-          || typeid(*layer) == typeid(BridgeSrcLayer)) {
-        bridge_dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerLayer));
-        break;
-      }
-    }
-  }
-  // bind dealer to bridge layers
-  if (bridge_dealer_ != nullptr) {
-    for (auto dst : net->layers()) {
-      if (typeid(*dst) == typeid(BridgeDstLayer)) {
-        auto src = net->srclayers(dst)[0];
-        name2bridge_[src->name()] = src;
-        name2bridge_[dst->name()] = dst;
-        if (src->partition_id() == id_) {
-          dynamic_cast<BridgeLayer*>(src)->MakePaired(dst, grp_id_,
-              bridge_dealer_, &name2bridge_);
-        }
-        if (dst->partition_id() == id_) {
-          dynamic_cast<BridgeLayer*>(dst)->MakePaired(src, grp_id_,
-              bridge_dealer_, &name2bridge_);
-        }
-      }
-    }
-  }
-}
-
-void Worker::InitNetParams(const std::string& folder, vector<Layer*> net) {
-
-    std::unordered_map<string, Param*> name2param;
-    for (auto layer : net) {
-        for (auto param : layer->GetParams()) {
-          // only owners fill the memory of parameter values.
-          //if (param->owner() == param->id()) {
-            CHECK(name2param.find(param->name()) == name2param.end());
-            name2param[param->name()] = param;
-          //}
-        }
-    }
-    vector<string> paths;
-    paths.push_back(folder);
-    NeuralNet::Load(paths, name2param);
-}
-
-void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) {
-  // for each server grp, its first subscriber worker grp does the param init
-  if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) {
-    // extract params that should be initialized by this worker
-    // must gen a name for each param if the user doesn't config it
-    std::unordered_map<string, Param*> name2param;
-    for (auto layer : net->layers()) {
-      if (layer->partition_id() == id_) {
-        for (auto param : layer->GetParams()) {
-          // only owners fill the memory of parameter values.
-          if (param->owner() == param->id()) {
-            CHECK(name2param.find(param->name()) == name2param.end());
-            name2param[param->name()] = param;
-          }
-        }
-      }
-    }
-    vector<string> paths;
-    for (const auto& p : job_conf_.checkpoint_path())
-      paths.push_back(p);
-    net->Load(paths, name2param);
-    // init other params who do not have checkpoint version
-    for (auto entry : name2param) {
-      if (entry.second->version() > 0) {
-        //  if load from pre-training params, reset version to start step
-        if (job_conf.reset_param_version()) {
-          entry.second->set_version(job_conf.step());
-        }
-      } else {
-        entry.second->InitValues(job_conf.step());
-        if (!job_conf.reset_param_version())
-          LOG(ERROR) << "better reset version of params from checkpoints "
-            << "to the same as other newly initialized params!";
-      }
-    }
-
-    // warmup training before put params to servers
-    // for (; step_ < job_conf.warmup_steps(); step_++)
-    //  TrainOneBatch(step_, net);
-    for (auto layer : net->layers()) {
-      if (layer->partition_id() == id_)
-        for (auto param : layer->GetParams())
-          if (param->owner() == param->id())
-            Put(param->version(), param);
-    }
-  }
-  // wait owners in the same procs init params, then no get requests sent
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-  for (auto layer : net->layers()) {
-    if (layer->partition_id() == id_)
-      for (auto param : layer->GetParams())
-        Get(job_conf.warmup_steps(), param);
-  }
-}
-
-void Worker::Checkpoint(int step, const std::string& folder, vector<Layer*> net) {
-  BlobProtos bps;
-  for (auto layer : net) {
-    //if (layer->partition_id() == id_) {
-      for (auto param : layer->GetParams()) {
-        // only owners fill the memory of parameter values.
-        //if (param->owner() == param->id()) {
-          auto *blob = bps.add_blob();
-          param->ToProto(blob);
-          bps.add_version(param->version());
-          bps.add_name(param->name());
-        //}
-      }
-    //}
-  }
-  char buf[256];
-  snprintf(buf, sizeof(buf), "%s/step%d-worker0", folder.c_str(), step);
-  LOG(INFO) << "checkpoint to " << buf;
-  WriteProtoToBinaryFile(bps, buf);
-}
-
-void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) {
-  BlobProtos bps;
-  for (auto layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      for (auto param : layer->GetParams()) {
-        // only owners fill the memory of parameter values.
-        if (param->owner() == param->id()) {
-          auto *blob = bps.add_blob();
-          param->ToProto(blob);
-          bps.add_version(param->version());
-          bps.add_name(param->name());
-        }
-      }
-    }
-  }
-  char buf[256];
-  snprintf(buf, sizeof(buf), "%s/step%d-worker%d", folder.c_str(), step, id_);
-  LOG(INFO) << "checkpoint to " << buf;
-  WriteProtoToBinaryFile(bps, buf);
-}
-
-int Worker::Put(int step, Param* param) {
-  if (dealer_ == nullptr) {
-    LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
-    return 1;
-  }
-  // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which
-  // would hang on some machines.
-  param->data().cpu_data();
-  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
-  msg->set_trgt(ParamTrgt(param->owner(), 0), step);
-  msg->set_type(kPut);
-  dealer_->Send(&msg);
-//  LOG(ERROR) << "worker msg " << msg;
-  return 1;
-}
-
-int Worker::Get(int step, Param* param) {
-  if (param->version() >= step)
-    return 1;
-  if (dealer_ == nullptr) {
-    LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
-    return 1;
-  }
-  // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which
-  // would hang on some machines.
-  param->mutable_data()->mutable_cpu_data();
-
-  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
-  msg->set_trgt(ParamTrgt(param->owner(), 0), step);
-  msg->set_type(kGet);
-  dealer_->Send(&msg);
-  return 1;
-}
-
-int Worker::Update(int step, Param* param) {
-  param->set_last_version(param->version());
-  if (dealer_ == nullptr) {
-    LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
-    return 1;
-  }
-  // head of data Blob (SyncMem) to cpu, because the stub thread may use
-  // cudaMemcpy copy gradients into msgs. cudaMemcpy hangs when called by the
-  // stub thread on some GPU machines.
-  // TODO(wangwei) fix this issue and remove the following line.
-  // optimize for training with single worker by removing stub and server, and
-  // updating parameters locally inside the worker GPU. Then we do not need to
-  // transfer gradients and parameter values between GPU-CPU.
-  param->grad().cpu_data();
-  // change the head of SyncMem to cpu; otherwise, the updated parameter
-  // values would not be synced to gpu (since the head is at gpu).
-  param->mutable_data()->mutable_cpu_data();
-
-  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
-  msg->set_trgt(ParamTrgt(param->owner(), 0), step);
-  msg->set_type(kUpdate);
-  dealer_->Send(&msg);
-  return 1;
-}
-
-int Worker::CollectAll(int step, NeuralNet* net) {
-  auto& layers = net->layers();
-  for (auto& layer : layers) {
-    if (layer->partition_id() == id_) {
-      for (Param* p : layer->GetParams()) {
-        Collect(step, p);
-      }
-    }
-  }
-  return 1;
-}
-
-int Worker::Collect(int step, Param* param) {
-  while (param->version() <= param->last_version()) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
-    // LOG(ERROR) << "wait  "<< param->id() << " at " << step << " by " <<id_;
-  }
-  return 1;
-}
-
-void Worker::Display(int flag, const std::string& prefix, NeuralNet* net) {
-  for (auto layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      const string& disp = layer->ToString(false, flag);
-      if (disp.length())
-        LOG(ERROR) << prefix << "  " << disp;
-    }
-  }
-}
-
-/****************************BPWorker**********************************/
-void BPWorker::TrainOneBatch(int step, NeuralNet* net) {
-  Forward(step, kTrain, net);
-  Backward(step, net);
-}
-
-void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) {
-  Forward(step, phase, net);
-}
-
-void BPWorker::Forward(int step, Phase phase, NeuralNet* net) {
-  map<string, string> label;
-  for (auto& layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      if (phase == kTrain && layer->unroll_index() == 0) {
-        // wait until param is updated
-        for (Param* p : layer->GetParams()) {
-          Collect(step, p);
-        }
-      }
-      // DLOG(ERROR) << "Forward " << layer->name();
-      layer->ComputeFeature(phase | kForward, net->srclayers(layer));
-      if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
-        label[layer->name()] = layer->ToString(true, phase | kForward);
-    }
-  }
-  if (label.size()) {
-    const string path = Cluster::Get()->vis_folder() + "/fp-step"
-      + std::to_string(step) +"-loc" + std::to_string(id_) + ".json";
-    WriteStringToTextFile(path, net->ToGraph(false).ToJson(label));
-  }
-}
-
-void BPWorker::Backward(int step, NeuralNet* net) {
-  map<string, string> label;
-  auto& layers = net->layers();
-  for (auto it = layers.rbegin(); it != layers.rend(); it++) {
-    Layer* layer = *it;
-    if (layer->partition_id() == id_) {
-      layer->ComputeGradient(kTrain | kBackward, net->srclayers(layer));
-      if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
-        label[layer->name()] = layer->ToString(true, kTrain | kBackward);
-      for (Param* p : layer->GetParams())
-        Update(step, p);
-    }
-  }
-  if (label.size()) {
-    const string path = Cluster::Get()->vis_folder() + "/bp-step"
-      + std::to_string(step) + "-loc" + std::to_string(id_) + ".json";
-    WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label));
-  }
-}
-
-/***************************BPTTWorker*********************************/
-void BPTTWorker::Forward(int step, Phase phase, NeuralNet* net) {
-  map<string, string> label;
-  for (auto& layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      if (phase == kTrain && layer->unroll_index() == 0) {
-        // wait until param is updated
-        for (Param* p : layer->GetParams()) {
-          Collect(step, p);
-          Zero(p->mutable_grad());
-        }
-      }
-      vector<Layer*> src = net->srclayers(layer);
-      if ((phase & kTest) && typeid(*layer) == typeid(RNNDummyLayer)) {
-        CHECK_LE(src.size(), 1);
-        auto dummy = dynamic_cast<RNNDummyLayer*>(layer);
-        Layer* srclayer = net->name2layer(dummy->srclayer(step));
-        if (step > 0)
-          CHECK(srclayer != nullptr);
-        if (srclayer != nullptr) {
-          src.clear();
-          src.push_back(srclayer);
-        }
-      }
-      // if full state rnn and not the starting of a new passing of the dataset,
-      // feed the hidden state of the last unit to the first unit.
-      if (layer->unroll_index() == 0 && full_state_ && !begin_) {
-        Layer* last = net->last_unroll_layer(layer);
-        CHECK(last != nullptr);
-        if (last != layer || (phase & kTest))
-          src.push_back(last);
-      }
-      // LOG(ERROR) << layer->name() << " forward";
-      // int ret =
-      layer->ComputeFeature(phase | kForward, src);
-      /*
-      if ((phase & Phase::kTrain) && ret == Status::kEnd)
-        begin_ = true;
-      */
-      if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
-        label[layer->name()] = layer->ToString(true, phase | kForward);
-    }
-  }
-  if (label.size()) {
-    const string path = Cluster::Get()->vis_folder() + "/fp-step"
-      + std::to_string(step) +"-loc" + std::to_string(id_) + ".json";
-    WriteStringToTextFile(path, net->ToGraph(false).ToJson(label));
-  }
-}
-
-void BPTTWorker::Backward(int step, NeuralNet* net) {
-  map<string, string> label;
-  auto& layers = net->layers();
-  for (auto it = layers.rbegin(); it != layers.rend(); it++) {
-    Layer* layer = *it;
-    if (layer->partition_id() == id_) {
-      layer->ComputeGradient(kTrain | kBackward | kAggGrad,
-          net->srclayers(layer));
-      // LOG(ERROR) << layer->name() << " backward";
-      if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
-        label[layer->name()] = layer->ToString(true, kTrain | kBackward);
-      // unrolled layers share parameter data and grad, just update the 1st one
-      if (layer->unroll_index() == 0)
-        for (Param* p : layer->GetParams())
-          Update(step, p);
-    }
-  }
-  if (label.size()) {
-    const string path = Cluster::Get()->vis_folder() + "/bp-step"
-      + std::to_string(step) + "-loc" + std::to_string(id_) + ".json";
-    WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label));
-  }
-}
-void BPTTWorker::Display(int flag, const std::string& prefix, NeuralNet* net) {
-  std::unordered_map<string, float> perf;
-  for (auto layer : net->layers()) {
-    if (layer->partition_id() == id_) {
-      const string& disp = layer->ToString(false, flag);
-      for (const auto& entry : GetMetricFromString(disp))
-        perf[entry.first] += entry.second;
-    }
-  }
-  string disp = prefix + " ";
-  for (const auto& entry : perf)
-    disp += entry.first + " = " + std::to_string(entry.second) + ", ";
-  LOG(ERROR) << disp;
-}
-/****************************CDWorker**********************************/
-void CDWorker::TrainOneBatch(int step, NeuralNet* net) {
-  const auto& layers = net->layers();
-  for (auto* layer : layers) {
-    for (Param* p : layer->GetParams())  // wait until param is updated
-      Collect(step, p);
-    layer->ComputeFeature(kPositive, net->srclayers(layer));
-  }
-  for (auto* layer : layers)
-    if (typeid(*layer) == typeid(RBMVisLayer)
-          || typeid(*layer) == typeid(RBMHidLayer))
-      layer->ComputeFeature(kNegative | kTest, net->srclayers(layer));
-  for (int i = 1; i < job_conf_.train_one_batch().cd_conf().cd_k(); i++) {
-    for (auto* layer : layers) {
-      if (typeid(*layer) == typeid(RBMVisLayer)
-          || typeid(*layer) == typeid(RBMHidLayer))
-      layer->ComputeFeature(kNegative, net->srclayers(layer));
-    }
-  }
-  for (auto* layer : layers) {
-    if (typeid(*layer) == typeid(RBMVisLayer)
-        || typeid(*layer) == typeid(RBMHidLayer)) {
-      layer->ComputeGradient(kTrain, net->srclayers(layer));
-      for (Param* p : layer->GetParams()) {
-        Update(step, p);
-      }
-    }
-  }
-}
-
-void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) {
-  auto& layers = net->layers();
-  for (auto *layer : layers)
-    layer->ComputeFeature(kPositive, net->srclayers(layer));
-  for (auto *layer : layers)
-    if (typeid(*layer) == typeid(RBMVisLayer))
-      layer->ComputeFeature(kNegative | kTest, net->srclayers(layer));
-}
-
-}  // namespace singa