You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/06/03 07:48:09 UTC
[04/60] incubator-singa git commit: SINGA-163 - Reorganize the
project folder layout
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/updater.cc
----------------------------------------------------------------------
diff --git a/src/utils/updater.cc b/src/utils/updater.cc
deleted file mode 100644
index a2180d3..0000000
--- a/src/utils/updater.cc
+++ /dev/null
@@ -1,284 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/updater.h"
-
-#include "mshadow/cxxnet_op.h"
-#include "mshadow/tensor.h"
-#include "singa/utils/singleton.h"
-#include "singa/utils/factory.h"
-
-namespace singa {
-
-using mshadow::cpu;
-using mshadow::expr::F;
-using mshadow::op::sqrtop;
-using mshadow::op::square;
-using mshadow::Shape;
-using mshadow::Shape1;
-using mshadow::Tensor;
-using mshadow::TensorContainer;
-
-LRGenerator* LRGenerator::Create(const LRGenProto& proto) {
- auto factory = Singleton<Factory<LRGenerator>>::Instance();
- LRGenerator* gen = nullptr;
- if (proto.has_user_type())
- gen = factory->Create(proto.user_type());
- else
- gen = factory->Create(proto.type());
- gen->Init(proto);
- return gen;
-}
-
-float FixedStepLRGen::Get(int step) {
- if (last_idx_ < proto_.fixedstep_conf().step_size() - 1
- && step >= proto_.fixedstep_conf().step(last_idx_ + 1)) {
- last_idx_++;
- }
- return proto_.fixedstep_conf().step_lr(last_idx_);
-}
-
-float StepLRGen::Get(int step) {
- // do not cast int to float
- int freq = proto_.step_conf().change_freq();
- float lr = proto_.base_lr() * pow(proto_.step_conf().gamma(), step / freq);
- // LOG_IF(INFO, step % freq == 0) << "Update learning rate to " << lr
- // << " @ step " << step;
- return lr;
-}
-
-float LinearLRGen::Get(int step) {
- int freq = proto_.linear_conf().change_freq();
- float r = step * 1.0 / freq;
- return (1.0 - r) * proto_.base_lr() + r * proto_.linear_conf().final_lr();
-}
-
-float ExpLRGen::Get(int step) {
- int freq = proto_.exponential_conf().change_freq();
- return proto_.base_lr() / pow(2, step * 1. / freq);
-}
-
-float InvLRGen::Get(int step) {
- return proto_.base_lr() * pow(1.f + proto_.inverse_conf().gamma() * step,
- - proto_.inverse_conf().pow());
-}
-
-float InvTLRGen::Get(int step) {
- return proto_.base_lr() / (1 + step * 1. / proto_.inverset_conf().final_lr());
-}
-
-Updater* Updater::Create(const UpdaterProto& proto) {
- auto factory = Singleton<Factory<Updater>>::Instance();
- Updater* updater = nullptr;
- if (proto.has_user_type())
- updater = factory->Create(proto.user_type());
- else
- updater = factory->Create(proto.type());
- updater->Init(proto);
- return updater;
-}
-
-/**************** added for Python Binding ***************************/
-Updater* Updater::CreateUpdater(const string str) {
- UpdaterProto conf;
- conf.ParseFromString(str);
- return Updater::Create(conf);
-}
-/***********************Python Binding end**************************/
-
-
-/***********************SGD with momentum******************************/
-void Updater::Init(const UpdaterProto& proto) {
- momentum_ = proto.momentum();
- weight_decay_ = proto.weight_decay();
- lr_gen_ = LRGenerator::Create(proto.learning_rate());
- clip_low_ = proto.clip_low();
- clip_high_ = proto.clip_high();
-}
-
-void Updater::Clip(const float low, const float high, Param* param) {
- Blob<float>* grad = param->mutable_grad();
- float* ptr = grad->mutable_cpu_data();
- for (int i = 0; i < grad->count(); i++) {
- if (ptr[i] > high)
- ptr[i] = high;
- else if (ptr[i] < low)
- ptr[i] = low;
- }
-}
-
-void SGDUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
- Clip(clip_low_, clip_high_, param);
- Shape<1> s = Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- float lr = lr_gen_->Get(step) * param->lr_scale();
- float wd = weight_decay_ * param->wd_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- if (momentum_ > 0) {
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- history = history * momentum_ - lr * grad;
- data += history;
- } else {
- grad *= -lr;
- data += grad;
- }
-}
-
-/***********************Nesterov******************************/
-void NesterovUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
- Clip(clip_low_, clip_high_, param);
-
- Shape<1> s = Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- TensorContainer<cpu, 1> tmp(s);
- float lr = lr_gen_->Get(step)*param->lr_scale();
- float wd = weight_decay_*param->wd_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- Copy(tmp, history);
- history = history * momentum_ + lr * grad;
- tmp = history * (1 + momentum_) - tmp * momentum_;
- data -= tmp;
-}
-/***********************AdaGrad******************************/
-void AdaGradUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
- Clip(clip_low_, clip_high_, param);
- Shape<1> s = Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- float lr = lr_gen_->Get(step)*param->lr_scale();
- float wd = weight_decay_*param->wd_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- history += F<square>(grad);
- data -= lr * grad / (F<sqrtop>(history, proto_.delta()));
-}
-
-/***********************RMSProp******************************/
-void RMSPropUpdater::Init(const UpdaterProto& proto) {
- Updater::Init(proto);
- rho_ = proto.rmsprop_conf().rho();
- delta_ = proto.delta();
-}
-
-void RMSPropUpdater::Update(int step, Param* param, float grad_scale) {
- if (clip_high_ > clip_low_)
- Clip(clip_low_, clip_high_, param);
-
- Shape<1> s=Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- float lr = lr_gen_->Get(step) * param->lr_scale();
- float wd = weight_decay_ * param->wd_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- history = history * rho_ + (1 - rho_) * F<square>(grad);
- data -= lr * grad / F<sqrtop>(history, delta_);
-}
-/***********************AdaDelta******************************/
-void AdaDeltaUpdater::Init(const UpdaterProto& proto){
- Updater::Init(proto);
- delta_ = proto.delta();
- rho_=proto.adadelta_conf().rho();
-}
-
-void AdaDeltaUpdater::Update(int step, Param* param, float grad_scale){
- Shape<1> s=Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
- TensorContainer<cpu, 1> tmp(s);
- float wd = weight_decay_*param->wd_scale();
- float lr = lr_gen_->Get(step) * param->lr_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- history = history * rho_ + (1 - rho_) * F<op::square>(grad);
- tmp = grad * F<op::sqrtop>(update, delta_) / F<op::sqrtop>(history, delta_);
- update = rho_ * update + (1 - rho_) * F<op::square>(tmp);
- data -= lr * tmp;
-}
-
-/***********************Adam******************************/
-void AdamUpdater::Init(const UpdaterProto &proto) {
- Updater::Init(proto);
- beta1_=proto.adam_conf().beta1();
- beta2_=proto.adam_conf().beta2();
- delta_ = proto.delta();
-}
-
-void AdamUpdater::Update(int step, Param* param, float grad_scale) {
- Shape<1> s=Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
- float wd = weight_decay_*param->wd_scale();
- float lr = lr_gen_->Get(step) * param->lr_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- history = history * beta1_ + (1 - beta1_) * grad;
- update = update * beta2_ + (1 - beta2_) * F<op::square>(grad);
- data -= lr * history / F<op::sqrtop>(update, delta_);
-}
-
-/***********************AdamMax******************************/
-void AdamMaxUpdater::Init(const UpdaterProto &proto) {
- Updater::Init(proto);
- beta1_=proto.adammax_conf().beta1();
- beta2_=proto.adammax_conf().beta2();
- delta_=proto.delta();
-}
-
-void AdamMaxUpdater::Update(int step, Param* param, float grad_scale) {
- Shape<1> s=Shape1(param->size());
- Tensor<cpu, 1> data(param->mutable_cpu_data(), s);
- Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s);
- Tensor<cpu, 1> history(param->mutable_cpu_history(), s);
- Tensor<cpu, 1> update(param->mutable_cpu_update(), s);
- float wd = weight_decay_*param->wd_scale();
- float lr = lr_gen_->Get(step) * param->lr_scale();
- grad *= grad_scale;
- if (wd > 0) // L2 regularization, should be done after timing grad_scale
- grad += data * wd;
- history = history * beta1_ + (1 - beta1_) * grad;
- update = update * beta2_;
- grad = F<op::abs>(grad);
- update = F<op::max>(update, grad) + delta_;
- data -= lr * history / update;
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/zk_service.cc
----------------------------------------------------------------------
diff --git a/src/utils/zk_service.cc b/src/utils/zk_service.cc
deleted file mode 100644
index 352f6f7..0000000
--- a/src/utils/zk_service.cc
+++ /dev/null
@@ -1,326 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/zk_service.h"
-
-#include <glog/logging.h>
-#include <algorithm>
-
-using std::string;
-using std::to_string;
-using std::vector;
-
-namespace singa {
-
-void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
- const char *path, void *watcherCtx) {
- // check if already callback
- RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
- if (cb->fn == nullptr) return;
- if (type == ZOO_CHILD_EVENT) {
- struct String_vector child;
- // check the child list and put another watcher
- int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
- if (ret == ZOK) {
- if (child.count == 0) {
- LOG(INFO) << "child.count = 0 in path: " << path;
- // all workers leave, we do callback now
- (*cb->fn)(cb->ctx);
- cb->fn = nullptr;
- }
- } else {
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_wget_children " << path << ")";
- }
- } else {
- LOG(FATAL) << "Unhandled callback type code: "<< type;
- }
-}
-
-ZKService::~ZKService() {
- // close zookeeper handler
- zookeeper_close(zkhandle_);
-}
-
-char zk_cxt[] = "ZKClusterRT";
-
-bool ZKService::Init(const string& host, int timeout) {
- zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
- zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
- static_cast<void *>(zk_cxt), 0);
- if (zkhandle_ == NULL) {
- LOG(ERROR) << "Error when connecting to zookeeper servers...";
- LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
- LOG(ERROR) << host.c_str();
- return false;
- }
-
- return true;
-}
-
-bool ZKService::CreateNode(const char* path, const char* val, int flag,
- char* output) {
- CHECK(zkhandle_) << "zk handler not initialized";
- char buf[kZKBufSize];
- int ret = 0;
- // send the zk request
- for (int i = 0; i < kNumRetry; ++i) {
- ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
- &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
- if (ret == ZNONODE) {
- LOG(WARNING) << "zookeeper parent node of " << path
- << " not exist, retry later";
- } else if (ret == ZCONNECTIONLOSS) {
- LOG(WARNING) << "zookeeper disconnected, retry later";
- } else {
- break;
- }
- sleep(kSleepSec);
- }
- // copy the node name to output
- if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
- snprintf(output, kZKBufSize, "%s", buf);
- // use snprintf instead of strcpy
- // strcpy(output, buf);
- }
- if (ret == ZOK) {
- LOG(INFO) << "created zookeeper node " << buf
- << " (" << (val == nullptr ? "NULL" : val) << ")";
- return true;
- } else if (ret == ZNODEEXISTS) {
- LOG(WARNING) << "zookeeper node " << path << " already exists";
- return true;
- } else if (ret == ZCONNECTIONLOSS) {
- LOG(ERROR) << "Cannot connect to zookeeper, "
- << "please ensure it is running properly...\n"
- << "If want to use zookeeper in our thirdparty folder, "
- << "you can start it by:\n"
- << "$ ./bin/zk-service.sh start";
- return false;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_create " << path << ")";
- return false;
-}
-
-bool ZKService::DeleteNode(const char* path) {
- CHECK(zkhandle_) << "zk handler not initialized";
- int ret = zoo_delete(zkhandle_, path, -1);
- if (ret == ZOK) {
- LOG(INFO) << "deleted zookeeper node " << path;
- return true;
- } else if (ret == ZNONODE) {
- LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
- return true;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_delete " << path << ")";
- return false;
-}
-
-bool ZKService::Exist(const char* path) {
- CHECK(zkhandle_) << "zk handler not initialized";
- struct Stat stat;
- int ret = zoo_exists(zkhandle_, path, 0, &stat);
- if (ret == ZOK) return true;
- else if (ret == ZNONODE) return false;
- LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
- return false;
-}
-
-bool ZKService::UpdateNode(const char* path, const char* val) {
- CHECK(zkhandle_) << "zk handler not initialized";
- // set version = -1, do not check content version
- int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
- if (ret == ZOK) {
- return true;
- } else if (ret == ZNONODE) {
- LOG(ERROR) << "zk node " << path << " does not exist";
- return false;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_get " << path << ")";
- return false;
-}
-
-bool ZKService::GetNode(const char* path, char* output) {
- CHECK(zkhandle_) << "zk handler not initialized";
- struct Stat stat;
- int val_len = kZKBufSize;
- int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
- if (ret == ZOK) {
- output[val_len] = '\0';
- return true;
- } else if (ret == ZNONODE) {
- LOG(ERROR) << "zk node " << path << " does not exist";
- return false;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_get " << path << ")";
- return false;
-}
-
-bool ZKService::GetChild(const char* path, vector<string>* vt) {
- CHECK(zkhandle_) << "zk handler not initialized";
- struct String_vector child;
- int ret = zoo_get_children(zkhandle_, path, 0, &child);
- if (ret == ZOK) {
- vt->clear();
- for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
- return true;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_get_children " << path << ")";
- return false;
-}
-
-bool ZKService::WGetChild(const char* path, vector<string>* vt,
- RTCallback *cb) {
- CHECK(zkhandle_) << "zk handler not initialized";
- struct String_vector child;
- int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
- if (ret == ZOK) {
- vt->clear();
- for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
- return true;
- }
- LOG(FATAL) << "Unhandled ZK error code: " << ret
- << " (zoo_get_children " << path << ")";
- return false;
-}
-
-
-void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
- const char *path, void *watcherCtx) {
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_CONNECTED_STATE)
- LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
- else if (state == ZOO_EXPIRED_SESSION_STATE)
- LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
- }
-}
-
-ZKClusterRT::ZKClusterRT(const string& host, int job_id) {
- host_ = host;
- workspace_ = GetZKJobWorkspace(job_id);
- group_path_ = workspace_ + kZKPathJobGroup;
- proc_path_ = workspace_ + kZKPathJobProc;
- proc_lock_path_ = workspace_ + kZKPathJobPLock;
-}
-
-ZKClusterRT::~ZKClusterRT() {
- // release callback vector
- for (RTCallback* p : cb_vec_) {
- delete p;
- }
-}
-
-bool ZKClusterRT::Init() {
- if (!zk_.Init(host_, timeout_)) return false;
- if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
- return false;
- return true;
-}
-
-int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
- char buf[kZKBufSize];
- string lock = proc_lock_path_ + "/lock-";
- if (!zk_.CreateNode(lock.c_str(), nullptr,
- ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
- return -1;
- }
- // get all children in lock folder
- vector<string> vt;
- if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
- return -1;
- }
- // find own position among all locks
- int id = -1;
- std::sort(vt.begin(), vt.end());
- for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
- if (proc_lock_path_+"/"+vt[i] == buf) {
- id = i;
- break;
- }
- }
- if (id == -1) {
- LOG(ERROR) << "cannot find own node " << buf;
- return -1;
- }
- // create a new node in proc path
- string path = proc_path_ + "/proc-" + to_string(id);
- string content = host_addr + "|" + to_string(pid);
- if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
- nullptr)) {
- return -1;
- }
- return id;
-}
-
-std::string ZKClusterRT::GetProcHost(int proc_id) {
- char val[kZKBufSize];
- // construct file name
- string path = proc_path_ + "/proc-" + to_string(proc_id);
- if (!zk_.GetNode(path.c_str(), val)) return "";
- int len = strlen(val) - 1;
- while (len && val[len] != '|') --len;
- CHECK(len);
- val[len] = '\0';
- return string(val);
-}
-
-bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
- CHECK_NOTNULL(fn);
- string path = groupPath(gid);
- // create zk node
- if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
- vector<string> child;
- // store the callback function and context for later usage
- RTCallback *cb = new RTCallback;
- cb->fn = fn;
- cb->ctx = ctx;
- cb_vec_.push_back(cb);
- // start to watch on the zk node, does not care about the first return value
- return zk_.WGetChild(path.c_str(), &child, cb);
-}
-
-bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
- string path = groupPath(s_group) + workerPath(gid, wid);
- // try to create an ephemeral node under server group path
- return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
-}
-
-bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
- string path = groupPath(s_group) + workerPath(gid, wid);
- return zk_.DeleteNode(path.c_str());
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
deleted file mode 100644
index e92d780..0000000
--- a/src/worker.cc
+++ /dev/null
@@ -1,545 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/worker.h"
-
-#include <glog/logging.h>
-#include <chrono>
-#include <thread>
-#include <typeinfo>
-#include "singa/utils/cluster.h"
-#include "singa/utils/factory.h"
-#include "singa/utils/singleton.h"
-#include "singa/utils/context.h"
-#include "singa/utils/math_blob.h"
-
-namespace singa {
-
-using std::string;
-
-Worker* Worker::CreateWorker(const string str) {
- AlgProto alg_proto;
- alg_proto.ParseFromString(str);
- return Worker::Create(alg_proto);
-}
-
-Worker* Worker::Create(const AlgProto& conf) {
- auto factory = Singleton<Factory<singa::Worker>>::Instance();
- Worker* worker = nullptr;
- if (conf.has_user_alg())
- worker = factory->Create(conf.user_alg());
- else
- worker = factory->Create(conf.alg());
- return worker;
-}
-
-void Worker::Setup(int grp_id, int id, const JobProto& conf,
- NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net) {
- grp_id_ = grp_id;
- id_ = id;
- job_conf_ = conf;
- train_net_ = train_net;
- val_net_ = val_net;
- test_net_ = test_net;
- InitSockets(train_net);
-}
-
-Worker::~Worker() {
- if (dealer_) delete dealer_;
- if (bridge_dealer_) delete bridge_dealer_;
-}
-
-void Worker::Run() {
- // setup gpu device
- auto context = Singleton<Context>::Instance();
- // TODO(wangwei) -2 for uninitial device; -1 for CPU; >=0 for GPU now.
- int device = -2;
- while (device == -2) {
- device = context->device_id(std::this_thread::get_id());
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- }
- LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") "
- << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU");
- if (device >= 0)
- context->ActivateDevice(device);
-
- auto cluster = Cluster::Get();
- int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
- CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
- step_ = job_conf_.step();
- InitSockets(train_net_);
- InitNetParams(job_conf_, train_net_);
- while (!StopNow(step_)) {
- if (ValidateNow(step_) && val_net_ != nullptr) {
- CollectAll(step_, train_net_);
- LOG(ERROR) << "Validation @ step " + std::to_string(step_);
- Test(job_conf_.validate_steps(), kVal, val_net_);
- }
- if (TestNow(step_) && test_net_ != nullptr) {
- CollectAll(step_, train_net_);
- LOG(ERROR) << "Test @ step " + std::to_string(step_);
- Test(job_conf_.test_steps(), kTest, test_net_);
- }
- if (CheckpointNow(step_) && grp_id_ == 0) {
- CollectAll(step_, train_net_);
- Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
- job_conf_.set_step(step_);
- }
- TrainOneBatch(step_, train_net_);
- if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0) {
- Display(kTrain | kForward | kBackward,
- "Train @ step " + std::to_string(step_), train_net_);
- }
- step_++;
- }
-
- // save the model
- if (grp_id_ == 0)
- Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
- // clean up
- cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
- // notify the stub on worker stop
- Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
- msg->set_type(kStop);
- dealer_->Send(&msg); // use param dealer to send the stop msg
- LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
-}
-
-void Worker::Test(int steps, Phase phase, NeuralNet* net) {
- for (int step = 0; step < steps; step++)
- TestOneBatch(step, phase, net);
- Display(phase, " ", net);
-}
-
-void Worker::InitSockets(const NeuralNet* net) {
- dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerParam));
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_) {
- if (typeid(*layer) == typeid(BridgeDstLayer)
- || typeid(*layer) == typeid(BridgeSrcLayer)) {
- bridge_dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerLayer));
- break;
- }
- }
- }
- // bind dealer to bridge layers
- if (bridge_dealer_ != nullptr) {
- for (auto dst : net->layers()) {
- if (typeid(*dst) == typeid(BridgeDstLayer)) {
- auto src = net->srclayers(dst)[0];
- name2bridge_[src->name()] = src;
- name2bridge_[dst->name()] = dst;
- if (src->partition_id() == id_) {
- dynamic_cast<BridgeLayer*>(src)->MakePaired(dst, grp_id_,
- bridge_dealer_, &name2bridge_);
- }
- if (dst->partition_id() == id_) {
- dynamic_cast<BridgeLayer*>(dst)->MakePaired(src, grp_id_,
- bridge_dealer_, &name2bridge_);
- }
- }
- }
- }
-}
-
-void Worker::InitNetParams(const std::string& folder, vector<Layer*> net) {
-
- std::unordered_map<string, Param*> name2param;
- for (auto layer : net) {
- for (auto param : layer->GetParams()) {
- // only owners fill the memory of parameter values.
- //if (param->owner() == param->id()) {
- CHECK(name2param.find(param->name()) == name2param.end());
- name2param[param->name()] = param;
- //}
- }
- }
- vector<string> paths;
- paths.push_back(folder);
- NeuralNet::Load(paths, name2param);
-}
-
-void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) {
- // for each server grp, its first subscriber worker grp does the param init
- if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) {
- // extract params that should be initialized by this worker
- // must gen a name for each param if the user doesn't config it
- std::unordered_map<string, Param*> name2param;
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_) {
- for (auto param : layer->GetParams()) {
- // only owners fill the memory of parameter values.
- if (param->owner() == param->id()) {
- CHECK(name2param.find(param->name()) == name2param.end());
- name2param[param->name()] = param;
- }
- }
- }
- }
- vector<string> paths;
- for (const auto& p : job_conf_.checkpoint_path())
- paths.push_back(p);
- net->Load(paths, name2param);
- // init other params who do not have checkpoint version
- for (auto entry : name2param) {
- if (entry.second->version() > 0) {
- // if load from pre-training params, reset version to start step
- if (job_conf.reset_param_version()) {
- entry.second->set_version(job_conf.step());
- }
- } else {
- entry.second->InitValues(job_conf.step());
- if (!job_conf.reset_param_version())
- LOG(ERROR) << "better reset version of params from checkpoints "
- << "to the same as other newly initialized params!";
- }
- }
-
- // warmup training before put params to servers
- // for (; step_ < job_conf.warmup_steps(); step_++)
- // TrainOneBatch(step_, net);
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_)
- for (auto param : layer->GetParams())
- if (param->owner() == param->id())
- Put(param->version(), param);
- }
- }
- // wait owners in the same procs init params, then no get requests sent
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_)
- for (auto param : layer->GetParams())
- Get(job_conf.warmup_steps(), param);
- }
-}
-
-void Worker::Checkpoint(int step, const std::string& folder, vector<Layer*> net) {
- BlobProtos bps;
- for (auto layer : net) {
- //if (layer->partition_id() == id_) {
- for (auto param : layer->GetParams()) {
- // only owners fill the memory of parameter values.
- //if (param->owner() == param->id()) {
- auto *blob = bps.add_blob();
- param->ToProto(blob);
- bps.add_version(param->version());
- bps.add_name(param->name());
- //}
- }
- //}
- }
- char buf[256];
- snprintf(buf, sizeof(buf), "%s/step%d-worker0", folder.c_str(), step);
- LOG(INFO) << "checkpoint to " << buf;
- WriteProtoToBinaryFile(bps, buf);
-}
-
-void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) {
- BlobProtos bps;
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_) {
- for (auto param : layer->GetParams()) {
- // only owners fill the memory of parameter values.
- if (param->owner() == param->id()) {
- auto *blob = bps.add_blob();
- param->ToProto(blob);
- bps.add_version(param->version());
- bps.add_name(param->name());
- }
- }
- }
- }
- char buf[256];
- snprintf(buf, sizeof(buf), "%s/step%d-worker%d", folder.c_str(), step, id_);
- LOG(INFO) << "checkpoint to " << buf;
- WriteProtoToBinaryFile(bps, buf);
-}
-
-int Worker::Put(int step, Param* param) {
- if (dealer_ == nullptr) {
- LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
- return 1;
- }
- // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which
- // would hang on some machines.
- param->data().cpu_data();
- Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
- msg->set_trgt(ParamTrgt(param->owner(), 0), step);
- msg->set_type(kPut);
- dealer_->Send(&msg);
-// LOG(ERROR) << "worker msg " << msg;
- return 1;
-}
-
-int Worker::Get(int step, Param* param) {
- if (param->version() >= step)
- return 1;
- if (dealer_ == nullptr) {
- LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
- return 1;
- }
- // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which
- // would hang on some machines.
- param->mutable_data()->mutable_cpu_data();
-
- Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
- msg->set_trgt(ParamTrgt(param->owner(), 0), step);
- msg->set_type(kGet);
- dealer_->Send(&msg);
- return 1;
-}
-
-int Worker::Update(int step, Param* param) {
- param->set_last_version(param->version());
- if (dealer_ == nullptr) {
- LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
- return 1;
- }
- // head of data Blob (SyncMem) to cpu, because the stub thread may use
- // cudaMemcpy copy gradients into msgs. cudaMemcpy hangs when called by the
- // stub thread on some GPU machines.
- // TODO(wangwei) fix this issue and remove the following line.
- // optimize for training with single worker by removing stub and server, and
- // updating parameters locally inside the worker GPU. Then we do not need to
- // transfer gradients and parameter values between GPU-CPU.
- param->grad().cpu_data();
- // change the head of SyncMem to cpu; otherwise, the updated parameter
- // values would not be synced to gpu (since the head is at gpu).
- param->mutable_data()->mutable_cpu_data();
-
- Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
- msg->set_trgt(ParamTrgt(param->owner(), 0), step);
- msg->set_type(kUpdate);
- dealer_->Send(&msg);
- return 1;
-}
-
-int Worker::CollectAll(int step, NeuralNet* net) {
- auto& layers = net->layers();
- for (auto& layer : layers) {
- if (layer->partition_id() == id_) {
- for (Param* p : layer->GetParams()) {
- Collect(step, p);
- }
- }
- }
- return 1;
-}
-
-int Worker::Collect(int step, Param* param) {
- while (param->version() <= param->last_version()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
- // LOG(ERROR) << "wait "<< param->id() << " at " << step << " by " <<id_;
- }
- return 1;
-}
-
-void Worker::Display(int flag, const std::string& prefix, NeuralNet* net) {
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_) {
- const string& disp = layer->ToString(false, flag);
- if (disp.length())
- LOG(ERROR) << prefix << " " << disp;
- }
- }
-}
-
-/****************************BPWorker**********************************/
-void BPWorker::TrainOneBatch(int step, NeuralNet* net) {
- Forward(step, kTrain, net);
- Backward(step, net);
-}
-
-void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) {
- Forward(step, phase, net);
-}
-
-void BPWorker::Forward(int step, Phase phase, NeuralNet* net) {
- map<string, string> label;
- for (auto& layer : net->layers()) {
- if (layer->partition_id() == id_) {
- if (phase == kTrain && layer->unroll_index() == 0) {
- // wait until param is updated
- for (Param* p : layer->GetParams()) {
- Collect(step, p);
- }
- }
- // DLOG(ERROR) << "Forward " << layer->name();
- layer->ComputeFeature(phase | kForward, net->srclayers(layer));
- if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
- label[layer->name()] = layer->ToString(true, phase | kForward);
- }
- }
- if (label.size()) {
- const string path = Cluster::Get()->vis_folder() + "/fp-step"
- + std::to_string(step) +"-loc" + std::to_string(id_) + ".json";
- WriteStringToTextFile(path, net->ToGraph(false).ToJson(label));
- }
-}
-
-void BPWorker::Backward(int step, NeuralNet* net) {
- map<string, string> label;
- auto& layers = net->layers();
- for (auto it = layers.rbegin(); it != layers.rend(); it++) {
- Layer* layer = *it;
- if (layer->partition_id() == id_) {
- layer->ComputeGradient(kTrain | kBackward, net->srclayers(layer));
- if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
- label[layer->name()] = layer->ToString(true, kTrain | kBackward);
- for (Param* p : layer->GetParams())
- Update(step, p);
- }
- }
- if (label.size()) {
- const string path = Cluster::Get()->vis_folder() + "/bp-step"
- + std::to_string(step) + "-loc" + std::to_string(id_) + ".json";
- WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label));
- }
-}
-
-/***************************BPTTWorker*********************************/
-void BPTTWorker::Forward(int step, Phase phase, NeuralNet* net) {
- map<string, string> label;
- for (auto& layer : net->layers()) {
- if (layer->partition_id() == id_) {
- if (phase == kTrain && layer->unroll_index() == 0) {
- // wait until param is updated
- for (Param* p : layer->GetParams()) {
- Collect(step, p);
- Zero(p->mutable_grad());
- }
- }
- vector<Layer*> src = net->srclayers(layer);
- if ((phase & kTest) && typeid(*layer) == typeid(RNNDummyLayer)) {
- CHECK_LE(src.size(), 1);
- auto dummy = dynamic_cast<RNNDummyLayer*>(layer);
- Layer* srclayer = net->name2layer(dummy->srclayer(step));
- if (step > 0)
- CHECK(srclayer != nullptr);
- if (srclayer != nullptr) {
- src.clear();
- src.push_back(srclayer);
- }
- }
- // if full state rnn and not the starting of a new passing of the dataset,
- // feed the hidden state of the last unit to the first unit.
- if (layer->unroll_index() == 0 && full_state_ && !begin_) {
- Layer* last = net->last_unroll_layer(layer);
- CHECK(last != nullptr);
- if (last != layer || (phase & kTest))
- src.push_back(last);
- }
- // LOG(ERROR) << layer->name() << " forward";
- // int ret =
- layer->ComputeFeature(phase | kForward, src);
- /*
- if ((phase & Phase::kTrain) && ret == Status::kEnd)
- begin_ = true;
- */
- if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
- label[layer->name()] = layer->ToString(true, phase | kForward);
- }
- }
- if (label.size()) {
- const string path = Cluster::Get()->vis_folder() + "/fp-step"
- + std::to_string(step) +"-loc" + std::to_string(id_) + ".json";
- WriteStringToTextFile(path, net->ToGraph(false).ToJson(label));
- }
-}
-
-void BPTTWorker::Backward(int step, NeuralNet* net) {
- map<string, string> label;
- auto& layers = net->layers();
- for (auto it = layers.rbegin(); it != layers.rend(); it++) {
- Layer* layer = *it;
- if (layer->partition_id() == id_) {
- layer->ComputeGradient(kTrain | kBackward | kAggGrad,
- net->srclayers(layer));
- // LOG(ERROR) << layer->name() << " backward";
- if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0)
- label[layer->name()] = layer->ToString(true, kTrain | kBackward);
- // unrolled layers share parameter data and grad, just update the 1st one
- if (layer->unroll_index() == 0)
- for (Param* p : layer->GetParams())
- Update(step, p);
- }
- }
- if (label.size()) {
- const string path = Cluster::Get()->vis_folder() + "/bp-step"
- + std::to_string(step) + "-loc" + std::to_string(id_) + ".json";
- WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label));
- }
-}
-void BPTTWorker::Display(int flag, const std::string& prefix, NeuralNet* net) {
- std::unordered_map<string, float> perf;
- for (auto layer : net->layers()) {
- if (layer->partition_id() == id_) {
- const string& disp = layer->ToString(false, flag);
- for (const auto& entry : GetMetricFromString(disp))
- perf[entry.first] += entry.second;
- }
- }
- string disp = prefix + " ";
- for (const auto& entry : perf)
- disp += entry.first + " = " + std::to_string(entry.second) + ", ";
- LOG(ERROR) << disp;
-}
-/****************************CDWorker**********************************/
-void CDWorker::TrainOneBatch(int step, NeuralNet* net) {
- const auto& layers = net->layers();
- for (auto* layer : layers) {
- for (Param* p : layer->GetParams()) // wait until param is updated
- Collect(step, p);
- layer->ComputeFeature(kPositive, net->srclayers(layer));
- }
- for (auto* layer : layers)
- if (typeid(*layer) == typeid(RBMVisLayer)
- || typeid(*layer) == typeid(RBMHidLayer))
- layer->ComputeFeature(kNegative | kTest, net->srclayers(layer));
- for (int i = 1; i < job_conf_.train_one_batch().cd_conf().cd_k(); i++) {
- for (auto* layer : layers) {
- if (typeid(*layer) == typeid(RBMVisLayer)
- || typeid(*layer) == typeid(RBMHidLayer))
- layer->ComputeFeature(kNegative, net->srclayers(layer));
- }
- }
- for (auto* layer : layers) {
- if (typeid(*layer) == typeid(RBMVisLayer)
- || typeid(*layer) == typeid(RBMHidLayer)) {
- layer->ComputeGradient(kTrain, net->srclayers(layer));
- for (Param* p : layer->GetParams()) {
- Update(step, p);
- }
- }
- }
-}
-
-void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) {
- auto& layers = net->layers();
- for (auto *layer : layers)
- layer->ComputeFeature(kPositive, net->srclayers(layer));
- for (auto *layer : layers)
- if (typeid(*layer) == typeid(RBMVisLayer))
- layer->ComputeFeature(kNegative | kTest, net->srclayers(layer));
-}
-
-} // namespace singa