You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/06/03 07:48:10 UTC
[05/60] incubator-singa git commit: SINGA-163 - Reorganize the
project folder layout
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
deleted file mode 100644
index a9928eb..0000000
--- a/src/utils/cluster.cc
+++ /dev/null
@@ -1,131 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/cluster.h"
-
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <fstream>
-
-namespace singa {
-using std::vector;
-
-Cluster* Cluster::Setup(int job, const SingaProto& singaConf,
- const ClusterProto& clusterConf) {
- Singleton<Cluster>::Instance()->Init(job, singaConf, clusterConf);
- return Singleton<Cluster>::Instance();
-}
-
-Cluster* Cluster::Get() {
- if (!Singleton<Cluster>::Instance()->nprocs_) {
- LOG(ERROR) << "The first call to Get should "
- << "provide the job conf path";
- }
- return Singleton<Cluster>::Instance();
-}
-
-void Cluster::Register(int pid, const std::string& endpoint) {
- procs_id_ = cluster_rt_->RegistProc(endpoint, pid);
- CHECK_GE(procs_id_, 0);
- CHECK_LT(procs_id_, nprocs());
- LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
- << " (pid = " << pid << ")";
-}
-
-void Cluster::Init(int job, const SingaProto& singaConf,
- const ClusterProto& clusterConf) {
- cluster_ = clusterConf;
- singa_ = singaConf;
- SetupFolders(clusterConf);
- if (server_worker_separate())
- nprocs_ = nworker_procs() + nserver_procs();
- else
- nprocs_ = std::max(nworker_procs(), nserver_procs());
-
- // locate the process id of every worker/server
- int ngrps = cluster_.nworker_groups();
- int grp_size = cluster_.nworkers_per_group();
- int procs = 0;
- for (int i = 0; i < ngrps; ++i) {
- for (int j = 0; j < grp_size; ++j) {
- procs = (i * grp_size + j) / cluster_.nworkers_per_procs();
- procs_ids_[Hash(i, j, kWorkerLayer)] = procs;
- procs_ids_[Hash(i, j, kWorkerParam)] = procs;
- }
- }
- int offset = cluster_.server_worker_separate() ? procs + 1 : 0;
- ngrps = cluster_.nserver_groups();
- grp_size = cluster_.nservers_per_group();
- for (int i = 0; i < ngrps; ++i) {
- for (int j = 0; j < grp_size; ++j) {
- procs_ids_[Hash(i, j, kServer)] =
- (i * grp_size + j) / cluster_.nservers_per_procs() + offset;
- }
- }
- // cluster_rt_ = new ZKClusterRT(singa_.zookeeper_host(), job);
- // cluster_rt_ = new SPClusterRT();
- cluster_rt_ = ClusterRuntime::Create(singa_.zookeeper_host(), job);
- cluster_rt_->Init();
-}
-
-void Cluster::SetupFolders(const ClusterProto &cluster) {
- // create visulization folder
- mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
- // create checkpoint folder
- mkdir(checkpoint_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-}
-
-const vector<int> Cluster::ExecutorRng(int pid, int grp_size, int procs_size) {
- int gstart, gend, start, end;
- if (grp_size >= procs_size) {
- // all workers in this procs are from the same group
- gstart = pid * procs_size / grp_size;
- gend = gstart + 1;
- start = pid * procs_size % grp_size;
- end = start + procs_size;
- } else {
- // there are multiple (complete) groups in this procs.
- CHECK_EQ(procs_size % grp_size, 0);
- int groups_per_procs = procs_size / grp_size;
- gstart = pid * groups_per_procs;
- gend = (pid+1) * groups_per_procs;
- start = 0;
- end = grp_size;
- }
- return vector<int>{gstart, gend, start, end};
-}
-
-int Cluster::Hash(int gid, int id, int flag) {
- int ret = -1;
- if (flag == kServer) {
- ret = kServer * cluster_.nworker_groups()
- * cluster_.nworkers_per_group()
- + (cluster_.nserver_groups() + gid)
- * cluster_.nservers_per_group() + id;
- } else {
- ret = (flag * cluster_.nworker_groups() + gid)
- * cluster_.nworkers_per_group() + id;
- }
- return ret;
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
deleted file mode 100644
index 9a7b8bd..0000000
--- a/src/utils/cluster_rt.cc
+++ /dev/null
@@ -1,110 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/cluster_rt.h"
-
-#include <glog/logging.h>
-#include <google/protobuf/text_format.h>
-#include <stdlib.h>
-#include <algorithm>
-#include <fstream>
-#include <iostream>
-#include "singa/proto/job.pb.h"
-
-#ifdef USE_ZOOKEEPER
-#include "singa/utils/zk_service.h"
-#endif
-
-using std::string;
-using std::to_string;
-using std::vector;
-
-namespace singa {
-
-ClusterRuntime* ClusterRuntime::Create(const std::string&host, int job_id) {
-#ifdef USE_ZOOKEEPER
- return new ZKClusterRT(host, job_id);
-#else
- return new SPClusterRT();
-#endif
-}
-
-SPClusterRT::~SPClusterRT() {
- // release callback vector
- for (auto list : grp_callbacks_)
- for (RTCallback* p : list.second) {
- delete p;
- }
-}
-
-bool SPClusterRT::Init() {
- return true;
-}
-
-int SPClusterRT::RegistProc(const string& host_addr, int pid) {
- int ret;
- lock_.lock();
- proc_list_.push_back(host_addr + std::to_string(pid));
- ret = proc_list_.size()-1;
- lock_.unlock();
- return ret;
-}
-
-string SPClusterRT::GetProcHost(int proc_id) {
- if (proc_list_.size() < (unsigned)proc_id + 1) return "";
- return proc_list_[proc_id];
-}
-
-bool SPClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) {
- // store the callback function and context for later usage
- RTCallback *cb = new RTCallback;
- cb->fn = fn;
- cb->ctx = ctx;
- lock_.lock();
- if (grp_callbacks_.count(gid) == 0)
- grp_callbacks_[gid] = vector<RTCallback*>{};
- grp_callbacks_[gid].push_back(cb);
- lock_.unlock();
- return true;
-}
-
-bool SPClusterRT::JoinSGroup(int gid, int wid, int s_group) {
- lock_.lock();
- if (grp_count_.count(gid) == 0)
- grp_count_[gid] = 0;
- grp_count_[gid]++;
- lock_.unlock();
- return true;
-}
-
-bool SPClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
- lock_.lock();
- if (--grp_count_[gid] == 0) {
- for (RTCallback* cb : grp_callbacks_[gid]) {
- (*cb->fn)(cb->ctx);
- cb->fn = nullptr;
- }
- }
- lock_.unlock();
- return true;
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/common.cc
----------------------------------------------------------------------
diff --git a/src/utils/common.cc b/src/utils/common.cc
index bd0fee5..d1a7d2c 100644
--- a/src/utils/common.cc
+++ b/src/utils/common.cc
@@ -1,574 +1,27 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
/**
- * The code is adapted from Caffe under BSD 2 Clause license.
- * All contributions by the University of California:
- * Copyright (c) 2014, The Regents of the University of California (Regents)
- * All rights reserved.
- * All other contributions:
- * Copyright (c) 2014, the respective contributors
- * All rights reserved.
- * Caffe uses a shared copyright model: each contributor holds copyright over
- * their contributions to Caffe. The project versioning records all such
- * contribution and copyright details. If a contributor wants to further mark
- * their specific copyright on a particular contribution, they should indicate
- * their copyright solely in the commit message of the change when it is
- * committed.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-#include "singa/utils/common.h"
-
-#include <sys/ioctl.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-
-#include <netinet/in.h>
-#include <net/if.h>
-#include <arpa/inet.h>
-
-#include <stdarg.h>
-#include <stdio.h>
-#include <time.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <cfloat>
-
-#include <fstream>
-
-#include <glog/logging.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-#include <google/protobuf/text_format.h>
namespace singa {
-const int kBufLen = 1024;
-
-string IntVecToString(const vector<int>& vec) {
- string disp = "(";
- for (int x : vec)
- disp += std::to_string(x) + ", ";
- return disp + ")";
-}
-
-/**
- * * Formatted string.
- * */
-string VStringPrintf(string fmt, va_list l) {
- char buffer[4096];
- vsnprintf(buffer, sizeof(buffer), fmt.c_str(), l);
- return string(buffer);
-}
-
-/**
- * * Formatted string.
- * */
-string StringPrintf(string fmt, ...) {
- va_list l;
- va_start(l, fmt); // fmt.AsString().c_str());
- string result = VStringPrintf(fmt, l);
- va_end(l);
- return result;
-}
-
-int ArgPos(int argc, char** arglist, const char* arg) {
- for (int i = 0; i < argc; i++) {
- if (strcmp(arglist[i], arg) == 0) {
- return i;
- }
- }
- return -1;
-}
-
-void CreateFolder(const string name) {
- struct stat buffer;
- if (stat(name.c_str(), &buffer) != 0) {
- mkdir(name.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
- CHECK_EQ(stat(name.c_str(), &buffer), 0);
- }
-}
-
-const vector<vector<int>> Slice(int num, const vector<int>& sizes) {
- vector<vector<int>> slices;
- if (num == 0)
- return slices;
- int avg = 0;
- for (int x : sizes)
- avg += x;
- avg = avg / num + avg % num;
- int diff = avg / 10;
- // DLOG(INFO) << "Slicer, param avg = " << avg << ", diff = " << diff;
-
- int capacity = avg, nbox = 0;
- for (int x : sizes) {
- vector<int> slice;
- string slicestr = "";
- while (x > 0) {
- int size = 0;
- if (capacity >= x) {
- capacity -= x;
- size = x;
- x = 0;
- } else if (capacity + diff >= x) {
- size = x;
- x = 0;
- capacity = 0;
- } else if (capacity >= diff) {
- x -= capacity;
- size = capacity;
- capacity = avg;
- nbox++;
- } else {
- capacity = avg;
- nbox++;
- }
- if (size) {
- slice.push_back(size);
- slicestr += ", " + std::to_string(size);
- }
- }
- // DLOG(INFO) << slicestr;
- slices.push_back(slice);
- }
- CHECK_LE(nbox, num);
- return slices;
-}
-
-const vector<int> PartitionSlices(int num, const vector<int>& slices) {
- vector<int> slice2box;
- if (num == 0)
- return slice2box;
- int avg = 0;
- for (int x : slices)
- avg += x;
- avg = avg / num + avg % num;
- int box = avg, boxid = 0, diff = avg / 10;
- for (auto it = slices.begin(); it != slices.end();) {
- int x = *it;
- if (box >= x) {
- box -= x;
- slice2box.push_back(boxid);
- it++;
- } else if (box + diff >= x) {
- slice2box.push_back(boxid);
- it++;
- box = 0;
- } else {
- box = avg;
- boxid++;
- }
- }
- CHECK_EQ(slice2box.size(), slices.size());
- int previd = -1;
- string disp;
- for (size_t i = 0; i < slice2box.size(); i++) {
- if (previd != slice2box[i]) {
- previd = slice2box[i];
- disp += " box = " +std::to_string(previd) + ":";
- }
- disp += " " + std::to_string(slices[i]);
- }
- return slice2box;
-}
-
-int gcd(int a, int b) {
- for (;;) {
- if (a == 0) return b;
- b %= a;
- if (b == 0) return a;
- a %= b;
- }
-}
-
-int LeastCommonMultiple(int a, int b) {
- int temp = gcd(a, b);
- return temp ? (a / temp * b) : 0;
-}
-
-string GetHostIP() {
- int fd;
- struct ifreq ifr;
- fd = socket(AF_INET, SOCK_DGRAM, 0);
- /* I want to get an IPv4 IP address */
- ifr.ifr_addr.sa_family = AF_INET;
- /* I want IP address attached to "eth0" */
- strncpy(ifr.ifr_name, "eth0", IFNAMSIZ-1);
- ioctl(fd, SIOCGIFADDR, &ifr);
- close(fd);
- string ip(inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr));
- /* display result */
- LOG(INFO) << "Host IP = " << ip;
- return ip;
-}
-
-void SetupLog(const string& log_dir, const string& model) {
- // TODO(wangwei) check if NFS, then create folder using script, otherwise
- // may have problems due to multiple processes create the same folder.
- CreateFolder(log_dir);
- string warn = log_dir + "/" + model + "-warn-";
- string info = log_dir + "/" + model + "-info-";
- string error = log_dir + "/" + model + "-error-";
- string fatal = log_dir + "/" + model + "-fatal-";
- google::SetLogDestination(google::WARNING, warn.c_str());
- google::SetLogDestination(google::INFO, info.c_str());
- google::SetLogDestination(google::ERROR, error.c_str());
- google::SetLogDestination(google::FATAL, fatal.c_str());
-}
-
-Metric::Metric(const string& str) {
- ParseFrom(str);
-}
-
-void Metric::Add(const string& name, float value) {
- Add(name, value, 1);
-}
-void Metric::Add(const string& name, float value, int count) {
- if (entry_.find(name) == entry_.end()) {
- entry_[name] = std::make_pair(1, value);
- } else {
- auto& e = entry_.at(name);
- e.first += count;
- e.second += value;
- }
-}
-
-void Metric::Reset() {
- for (auto& e : entry_) {
- e.second.first = 0;
- e.second.second = 0;
- }
-}
-
-string Metric::ToLogString() const {
- string ret;
- size_t k = 0;
- for (auto e : entry_) {
- ret += e.first + " = ";
- ret += std::to_string(e.second.second / e.second.first);
- if (++k < entry_.size())
- ret += ", ";
- }
- return ret;
-}
-
-string Metric::ToString() const {
- MetricProto proto;
- for (auto e : entry_) {
- proto.add_name(e.first);
- proto.add_count(e.second.first);
- proto.add_val(e.second.second);
- }
- string ret;
- proto.SerializeToString(&ret);
- return ret;
-}
-
-void Metric::ParseFrom(const string& msg) {
- MetricProto proto;
- proto.ParseFromString(msg);
- Reset();
- for (int i = 0; i < proto.name_size(); i++) {
- entry_[proto.name(i)] = std::make_pair(proto.count(i), proto.val(i));
- }
-}
-
-
-/*************Below functions are adapted from Caffe ************/
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::FileInputStream;
-using google::protobuf::io::FileOutputStream;
-using google::protobuf::io::ZeroCopyInputStream;
-
-
-void Im2col(const float* data_im, const int channels,
- const int height, const int width, const int kernel_h, const int kernel_w,
- const int pad_h, const int pad_w, const int stride_h, const int stride_w,
- float* data_col) {
- int height_col = (height + 2 * pad_h - kernel_h) / stride_h + 1;
- int width_col = (width + 2 * pad_w - kernel_w) / stride_w + 1;
- int channels_col = channels * kernel_h * kernel_w;
- for (int c = 0; c < channels_col; ++c) {
- int w_offset = c % kernel_w;
- int h_offset = (c / kernel_w) % kernel_h;
- int c_im = c / kernel_h / kernel_w;
- for (int h = 0; h < height_col; ++h) {
- for (int w = 0; w < width_col; ++w) {
- int h_pad = h * stride_h - pad_h + h_offset;
- int w_pad = w * stride_w - pad_w + w_offset;
- if (h_pad >= 0 && h_pad < height && w_pad >= 0 && w_pad < width)
- data_col[(c * height_col + h) * width_col + w] =
- data_im[(c_im * height + h_pad) * width + w_pad];
- else
- data_col[(c * height_col + h) * width_col + w] = 0;
- }
- }
- }
-}
-
-void Col2im(const float* data_col, const int channels,
- const int height, const int width, const int patch_h, const int patch_w,
- const int pad_h, const int pad_w, const int stride_h, const int stride_w,
- float* data_im) {
- memset(data_im, 0, height * width * channels * sizeof(float));
- int height_col = (height + 2 * pad_h - patch_h) / stride_h + 1;
- int width_col = (width + 2 * pad_w - patch_w) / stride_w + 1;
- int channels_col = channels * patch_h * patch_w;
- for (int c = 0; c < channels_col; ++c) {
- int w_offset = c % patch_w;
- int h_offset = (c / patch_w) % patch_h;
- int c_im = c / patch_h / patch_w;
- for (int h = 0; h < height_col; ++h) {
- for (int w = 0; w < width_col; ++w) {
- int h_pad = h * stride_h - pad_h + h_offset;
- int w_pad = w * stride_w - pad_w + w_offset;
- if (h_pad >= 0 && h_pad < height && w_pad >= 0 && w_pad < width)
- data_im[(c_im * height + h_pad) * width + w_pad] +=
- data_col[(c * height_col + h) * width_col + w];
- }
- }
- }
-}
-
-void ForwardMaxPooling(const float* bottom, const int num, const int channels,
- const int height, const int width, const int kernel_h, const int kernel_w,
- const int pad_h, const int pad_w, const int stride_h, const int stride_w,
- float* top, float* mask) {
- int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1;
- int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1;
- int top_count = num * top_height * top_width * channels;
- for (int i = 0; i < top_count; i++) {
- mask[i] = -1;
- top[i] = -FLT_MAX;
- }
- const int bottom_offset = height * width;
- const int top_offset = top_height * top_width;
- // The main loop
- for (int n = 0; n < num; ++n) {
- for (int c = 0; c < channels; ++c) {
- for (int ph = 0; ph < top_height; ++ph) {
- for (int pw = 0; pw < top_width; ++pw) {
- int hstart = ph * stride_h - pad_h;
- int wstart = pw * stride_w - pad_w;
- int hend = std::min(hstart + kernel_h, height);
- int wend = std::min(wstart + kernel_w, width);
- hstart = std::max(hstart, 0);
- wstart = std::max(wstart, 0);
- const int top_index = ph * top_width + pw;
- for (int h = hstart; h < hend; ++h) {
- for (int w = wstart; w < wend; ++w) {
- const int index = h * width + w;
- if (bottom[index] > top[top_index]) {
- top[top_index] = bottom[index];
- mask[top_index] = index;
- }
- }
- }
- }
- }
- // compute offset
- bottom += bottom_offset;
- top += top_offset;
- mask += top_offset;
- }
- }
-}
-
-void BackwardMaxPooling(const float* top, const float* mask, const int num,
- const int channels, const int height, const int width,
- const int kernel_h, const int kernel_w, const int pad_h, const int pad_w,
- const int stride_h, const int stride_w,
- float* bottom) {
- int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1;
- int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1;
- const int top_offset = top_height * top_width;
- const int bottom_offset = height * width;
- memset(bottom, 0, sizeof(float) * num * channels * bottom_offset);
- for (int n = 0; n < num; ++n) {
- for (int c = 0; c < channels; ++c) {
- for (int ph = 0; ph < top_height; ++ph) {
- for (int pw = 0; pw < top_width; ++pw) {
- const int top_idx = ph * top_width + pw;
- const int bottom_idx = static_cast<int>(mask[top_idx]);
- bottom[bottom_idx] += top[top_idx];
- }
- }
- top += top_offset;
- mask += top_offset;
- bottom += bottom_offset;
- }
- }
-}
-
-void ForwardAvgPooling(const float* bottom, const int num, const int channels,
- const int height, const int width, const int kernel_h, const int kernel_w,
- const int pad_h, const int pad_w, const int stride_h, const int stride_w,
- float* top) {
- int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1;
- int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1;
- int top_count = num * top_height * top_width * channels;
- for (int i = 0; i < top_count; i++) {
- top[i] = 0;
- }
- const int bottom_offset = height * width;
- const int top_offset = top_height * top_width;
- // The main loop
- for (int n = 0; n < num; ++n) {
- for (int c = 0; c < channels; ++c) {
- for (int ph = 0; ph < top_height; ++ph) {
- for (int pw = 0; pw < top_width; ++pw) {
- int hstart = ph * stride_h - pad_h;
- int wstart = pw * stride_w - pad_w;
- int hend = std::min(hstart + kernel_h, height+pad_h);
- int wend = std::min(wstart + kernel_w, width+pad_w);
- int pool_size = (hend-hstart) * (wend-wstart);
- hstart = std::max(hstart, 0);
- wstart = std::max(wstart, 0);
- hend = std::min(hend, height);
- wend = std::min(wend, width);
- const int top_index = ph * top_width + pw;
- for (int h = hstart; h < hend; ++h) {
- for (int w = wstart; w < wend; ++w) {
- const int index = h * width + w;
- top[top_index] += bottom[index];
- }
- }
- top[top_index] /= pool_size;
- }
- }
- // compute offset
- bottom += bottom_offset;
- top += top_offset;
- }
- }
-}
-
-void BackwardAvgPooling(const float* top, const int num, const int channels,
- const int height, const int width, const int kernel_h, const int kernel_w,
- const int pad_h, const int pad_w, const int stride_h, const int stride_w,
- float* bottom) {
- int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1;
- int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1;
- const int top_offset = top_height * top_width;
- const int bottom_offset = height * width;
- memset(bottom, 0, sizeof(float) * num * channels * bottom_offset);
- for (int n = 0; n < num; ++n) {
- for (int c = 0; c < channels; ++c) {
- for (int ph = 0; ph < top_height; ++ph) {
- for (int pw = 0; pw < top_width; ++pw) {
- int hstart = ph * stride_h - pad_h;
- int wstart = pw * stride_w - pad_w;
- int hend = std::min(hstart + kernel_h, height+pad_h);
- int wend = std::min(wstart + kernel_w, width+pad_w);
- int pool_size = (hend-hstart) * (wend-wstart);
- hstart = std::max(hstart, 0);
- wstart = std::max(wstart, 0);
- hend = std::min(hend, height);
- wend = std::min(wend, width);
- const int top_index = ph * top_width + pw;
- for (int h = hstart; h < hend; ++h) {
- for (int w = wstart; w < wend; ++w) {
- const int index = h * width + w;
- bottom[index] += top[top_index] / pool_size;
- }
- }
- }
- }
- top += top_offset;
- bottom += bottom_offset;
- }
- }
-}
-
-void ReadProtoFromTextFile(const char* filename, Message* proto) {
- int fd = open(filename, O_RDONLY);
- CHECK_NE(fd, -1) << "File not found: " << filename;
- FileInputStream* input = new FileInputStream(fd);
- CHECK(google::protobuf::TextFormat::Parse(input, proto));
- delete input;
- close(fd);
-}
-
-void WriteProtoToTextFile(const Message& proto, const char* filename) {
- int fd = open(filename, O_WRONLY | O_CREAT, 0644);
- FileOutputStream* output = new FileOutputStream(fd);
- CHECK(google::protobuf::TextFormat::Print(proto, output));
- delete output;
- close(fd);
-}
-
-void ReadProtoFromBinaryFile(const char* filename, Message* proto) {
- int fd = open(filename, O_RDONLY);
- CHECK_NE(fd, -1) << "File not found: " << filename;
- ZeroCopyInputStream* raw_input = new FileInputStream(fd);
- CodedInputStream* coded_input = new CodedInputStream(raw_input);
- // upper limit 512MB, warning threshold 256MB
- coded_input->SetTotalBytesLimit(536870912, 268435456);
- CHECK(proto->ParseFromCodedStream(coded_input));
- delete coded_input;
- delete raw_input;
- close(fd);
-}
-
-void WriteProtoToBinaryFile(const Message& proto, const char* filename) {
- int fd = open(filename, O_CREAT|O_WRONLY|O_TRUNC, 0644);
- CHECK_NE(fd, -1) << "File cannot open: " << filename;
- CHECK(proto.SerializeToFileDescriptor(fd));
-}
-void WriteStringToTextFile(const string& filename, const string& context) {
- std::ofstream ofs;
- ofs.open(filename);
- CHECK(ofs.is_open()) << "Can't write to file: " << filename;
- ofs << context;
- ofs.flush();
- ofs.close();
-}
-const vector<std::pair<string, float>> GetMetricFromString(const string& disp) {
- size_t pos = 0;
- vector<string> terms;
- while (pos != string::npos) {
- auto next = disp.find_first_of(" ,", pos); // delimiter: space or comma
- if (next != string::npos) {
- terms.push_back(disp.substr(pos, next - pos));
- pos = disp.find_first_not_of(" ,", next + 1);
- } else {
- break;
- }
- }
- if (pos != string::npos)
- terms.push_back(disp.substr(pos));
- vector<std::pair<string, float>> ret;
- for (unsigned i = 0; i < terms.size(); i++) {
- if (terms[i] == "=") {
- CHECK_GE(i, 1);
- CHECK_LT(i, terms.size() - 1) << "terms[i] = " << terms[i];
- ret.push_back(std::make_pair(terms[i-1], std::stof(terms[i + 1])));
- }
- }
- return ret;
-}
-} // namespace singa
+} /* singa */
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/graph.cc
----------------------------------------------------------------------
diff --git a/src/utils/graph.cc b/src/utils/graph.cc
deleted file mode 100644
index 4f59635..0000000
--- a/src/utils/graph.cc
+++ /dev/null
@@ -1,273 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/graph.h"
-
-#include <glog/logging.h>
-#include <algorithm>
-#include <queue>
-#include <unordered_set>
-#include "singa/utils/common.h"
-
-namespace singa {
-
-using std::map;
-using std::string;
-using std::vector;
-
-/**************************************************************************
- * Implementation for Node class
- *************************************************************************/
-Node::Node(string name) {
- this->name = name;
-}
-Node::Node(string name, const std::map<string, string>& attrs) {
- this->name = name;
- this->attrs = attrs;
-}
-
-Node::Node(const string& name, const string& origin, int id, void* proto) {
- this->name = name;
- this->origin = origin;
- this->proto = proto;
- this->partition_id = id;
-}
-
-void Node::AddDstNode(Node* dstnode) {
- dstnodes.push_back(dstnode);
-}
-
-void Node::AddSrcNode(Node* srcnode) {
- srcnodes.push_back(srcnode);
-}
-
-void Node::RemoveDstNode(Node* dst) {
- auto iter = dstnodes.begin();
- while ((*iter)->name != dst->name && iter != dstnodes.end())
- iter++;
- CHECK_STREQ((*iter)->name.c_str(), dst->name.c_str());
- dstnodes.erase(iter);
-}
-
-void Node::RemoveSrcNode(Node* src) {
- auto iter = srcnodes.begin();
- while ((*iter)->name != src->name && iter != srcnodes.end())
- iter++;
- CHECK_STREQ((*iter)->name.c_str(), src->name.c_str());
- srcnodes.erase(iter);
-}
-
-/****************************************************************************
- * Implementation for Graph class
- ****************************************************************************/
-
-Graph::~Graph() {
- for (Node* node : nodes_)
- delete node;
-}
-
-Node* Graph::AddNode(const string& name, const string& origin, int id,
- void* proto) {
- Node* node = new Node(name, origin, id, proto);
- nodes_.push_back(node);
- CHECK(name2node_.find(node->name) == name2node_.end())
- << "node " << node->name << " already exists";
- name2node_[node->name] = node;
- return node;
-}
-
-Node* Graph::AddNode(const string& name,
- const std::map<string, string>& attrs) {
- Node* node = new Node(name, attrs);
- nodes_.push_back(node);
- CHECK(name2node_.find(node->name) == name2node_.end())
- << "node " << node->name << " already exists";
- name2node_[node->name] = node;
- return node;
-}
-
-void Graph::AddEdge(Node* srcnode, Node* dstnode) {
- srcnode->AddDstNode(dstnode);
- dstnode->AddSrcNode(srcnode);
-}
-
-void Graph::AddEdge(const string& src, const string& dst) {
- auto srcnode = name2node_.find(src);
- CHECK(srcnode != name2node_.end()) << "can't find src node " << src;
- auto dstnode = name2node_.find(dst);
- CHECK(dstnode != name2node_.end()) << "can't find dst node " << dst;
- AddEdge(srcnode->second, dstnode->second);
-}
-void Graph::AddEdge(Node* srcnode, Node* dstnode,
- const std::map<string, string>& attrs) {
- AddEdge(srcnode, dstnode);
- edge_attrs_[GetEdgeName(srcnode->name, dstnode->name)] = attrs;
-}
-void Graph::AddEdge(const string& src, const std::string& dst,
- const std::map<string, string>& attrs) {
- AddEdge(src, dst);
- edge_attrs_[GetEdgeName(src, dst)] = attrs;
-}
-
-void Graph::RemoveEdge(Node* src, Node* dst) {
- src->RemoveDstNode(dst);
- dst->RemoveSrcNode(src);
-}
-
-void Graph::RemoveEdge(const string &src, const string& dst) {
- auto srcnode = name2node_.find(src);
- CHECK(srcnode != name2node_.end()) << "can't find src node " << src;
- auto dstnode = name2node_.find(dst);
- CHECK(dstnode != name2node_.end()) << "can't find dst node " << dst;
- RemoveEdge(srcnode->second, dstnode->second);
-}
-
-// sort to make `bottom' nodes be placed in the front positions
-void Graph::Sort() {
- // nodes to be visited
- std::queue<Node*> visiting_nodes;
- // visited node set
- std::unordered_set<Node*> visited_set;
- // visiting_nodes + visted_set
- std::unordered_set<Node*> visit_set;;
- for (auto node : nodes_) {
- // visit nodes without source nodes firstly
- if (node->srcnodes.size() == 0) {
- visiting_nodes.push(node);
- visit_set.insert(node);
- }
- }
- int n = nodes_.size();
- nodes_.clear();
- while (!visiting_nodes.empty()) {
- auto node = visiting_nodes.front();
- visiting_nodes.pop();
- bool visit = true;
- bool bi_direction = false;
- // check if a node has a bi-direction edge with its neighbour
- for (auto src : node->srcnodes)
- for (auto src_of_src : src->srcnodes)
- if (strcmp((src_of_src->name).c_str(), (node->name).c_str()) == 0) {
- bi_direction = true;
- break;
- }
- // check whether its src nodes number greater than 1
- if (bi_direction && (node->srcnodes).size() > 1) {
- auto src = node->srcnodes.at(0);
- if (visited_set.find(src) == visited_set.end()) {
- visit = false;
- }
- } else {
- for (auto src : node->srcnodes)
- if (visited_set.find(src) == visited_set.end()) {
- visit = false;
- break;
- }
- }
- if (visit) {
- nodes_.push_back(node);
- visited_set.insert(node);
- for (auto dst : node->dstnodes) {
- // queueing the dst node if it is not queued before
- if (visit_set.find(dst) == visit_set.end()) {
- visiting_nodes.push(dst);
- visit_set.insert(dst);
- }
- }
- } else {
- visiting_nodes.push(node);
- }
- }
- CHECK_EQ(nodes_.size(), n);
-}
-
-const Graph Graph::Reverse() const {
- Graph g;
- for (Node* n : nodes_)
- g.AddNode(n->name, n->attrs);
- for (Node* src : nodes_)
- for (Node* dst : src->dstnodes) {
- map<string, string> attrs;
- const string edge = GetEdgeName(src->name, dst->name);
- if (edge_attrs_.find(edge) != edge_attrs_.end())
- attrs = edge_attrs_.at(edge);
- g.AddEdge(dst->name, src->name, attrs);
- }
- return g;
-}
-string Graph::ToJson() const {
- map<string, string> label;
- return ToJson(label);
-}
-
-
-string Graph::ToJson(const map<string, string>& label) const {
- string disp = "{\"directed\":1,\n";
-
- // add nodes
- disp += "\"nodes\":[\n";
-
- bool first = true;
- map<string, int> node_id;
- int id = 0;
- for (auto node : nodes_) {
- string name = node->name;
- string lbl = name + " -- ";
- if (label.find(name) != label.end())
- lbl += label.at(name);
- if (node->attrs.find("label") != node->attrs.end())
- lbl += node->attrs.at("label");
- disp += StringPrintf("%c{\"id\":\"%s\", \"label\":\"%s\"",
- !first ? ',' : ' ', name.c_str(), lbl.c_str());
- for (const auto& attr : node->attrs)
- if (attr.first != "label")
- disp += StringPrintf(", \"%s\":\"%s\"",
- attr.first.c_str(), attr.second.c_str());
- disp += "}\n";
- first = false;
- node_id[name] = id++;
- }
- disp += "]\n,\n";
-
- // add edges
- disp += "\"links\":[\n";
- first = true;
- for (auto src : nodes_) {
- for (auto dst : src->dstnodes) {
- const string edge_name = GetEdgeName(src->name, dst->name);
- string lbl = "";
- if (label.find(edge_name) != label.end())
- lbl = label.at(edge_name);
- disp += StringPrintf("%c{\"source\":%d, \"target\":%d, \"label\": \"%s\"",
- !first ? ',' : ' ', node_id[src->name], node_id[dst->name],
- lbl.c_str());
- if (edge_attrs_.find(edge_name) != edge_attrs_.end()) {
- for (const auto& attr : edge_attrs_.at(edge_name))
- disp += StringPrintf(", \"%s\":\"%s\"",
- attr.first.c_str(), attr.second.c_str());
- }
- disp += "}\n";
- first = false;
- }
- }
- return disp + "]}";
-}
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/image_transform.cc
----------------------------------------------------------------------
diff --git a/src/utils/image_transform.cc b/src/utils/image_transform.cc
deleted file mode 100644
index 28d5f4c..0000000
--- a/src/utils/image_transform.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-#include "singa/utils/image_transform.h"
-
-namespace singa {
-
-void ImageTransform(const float* in, const float* mean, bool mirror, int h_crop,
- int w_crop, int h_offset, int w_offset, int channel, int height, int width,
- float scale, float* out) {
- if (h_crop == 0) {
- CHECK_EQ(h_offset, 0);
- h_crop = height;
- }
- if (w_crop ==0) {
- CHECK_EQ(w_offset, 0);
- w_crop = width;
- }
- CHECK_NE(scale, 0);
-
- int out_idx = 0, in_idx = 0;
- for (int c = 0; c < channel; c++) {
- for (int h = 0; h < h_crop; h++) {
- for (int w = 0; w < w_crop; w++) {
- in_idx = (c * height + h_offset + h) * width + w_offset + w;
- if (mirror) {
- out_idx = (c * h_crop + h) * w_crop + (w_crop - 1 - w);
- } else {
- out_idx = (c * h_crop + h) * w_crop + w;
- }
- out[out_idx] = in[in_idx];
- if (mean != nullptr)
- out[out_idx] -= mean[in_idx];
- out[out_idx] *= scale;
- }
- }
- }
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/job_manager.cc
----------------------------------------------------------------------
diff --git a/src/utils/job_manager.cc b/src/utils/job_manager.cc
deleted file mode 100644
index 2ea5b1b..0000000
--- a/src/utils/job_manager.cc
+++ /dev/null
@@ -1,271 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/job_manager.h"
-
-#include <glog/logging.h>
-#include <google/protobuf/text_format.h>
-#include <stdlib.h>
-#include <algorithm>
-#include <fstream>
-#include <iostream>
-#include "singa/proto/job.pb.h"
-
-using std::string;
-using std::vector;
-
-namespace singa {
-
-JobManager::JobManager(const string& host) {
- host_ = host;
-}
-
-bool JobManager::Init() {
-#ifdef USE_ZOOKEEPER
- if (!zk_.Init(host_, timeout_)) return false;
- if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
- return false;
- if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr))
- return false;
- if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
- return false;
-#endif
- return true;
-}
-
-bool JobManager::GenerateJobID(int* id) {
-#ifdef USE_ZOOKEEPER
- char buf[kZKBufSize];
- string lock = kZKPathJLock + "/lock-";
- if (!zk_.CreateNode(lock.c_str(), nullptr,
- ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
- return false;
- }
- *id = atoi(buf + strlen(buf) - 10);
-#else
- *id = 0;
-#endif
- return true;
-}
-
-bool JobManager::GenerateHostList(const char* host_file, const char* job_file,
- vector<string>* list) {
- int nprocs = 1;
- list->clear();
- // compute required #process from job conf
- if (job_file != nullptr) {
- ClusterProto cluster;
- google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
- &cluster);
- int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
- / cluster.nworkers_per_procs();
- int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
- / cluster.nservers_per_procs();
- if (cluster.server_worker_separate())
- nprocs = nworker_procs + nserver_procs;
- else
- nprocs = std::max(nworker_procs, nserver_procs);
- }
-#ifdef USE_ZOOKEEPER
- // get available host list from global conf
- std::ifstream hostfile(host_file);
- if (!hostfile.is_open()) {
- LOG(FATAL) << "Cannot open file: " << host_file;
- }
- vector<string> hosts;
- string host;
- while (!hostfile.eof()) {
- getline(hostfile, host);
- if (!host.length() || host[0] == '#') continue;
- hosts.push_back(host);
- }
- if (!hosts.size()) {
- LOG(FATAL) << "Empty host file";
- }
- // read next host index
- char val[kZKBufSize];
- if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false;
- int next = atoi(val);
- // generate host list
- for (int i = 0; i < nprocs; ++i) {
- list->push_back(hosts[(next + i) % hosts.size()]);
- }
- // write next host index
- next = (next + nprocs) % hosts.size();
- snprintf(val, kZKBufSize, "%d", next);
- if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false;
-#else
- CHECK_EQ(nprocs, 1) << "To run multi-process job, please enable zookeeper";
- list->push_back("localhost");
-#endif
- return true;
-}
-
-bool JobManager::ListJobProcs(int job, vector<string>* procs) {
- procs->clear();
-#ifdef USE_ZOOKEEPER
- string job_path = GetZKJobWorkspace(job);
- // check job path
- if (!zk_.Exist(job_path.c_str())) {
- LOG(ERROR) << "job " << job << " not exists";
- return true;
- }
- string proc_path = job_path + kZKPathJobProc;
- vector<string> vt;
- // check job proc path
- if (!zk_.GetChild(proc_path.c_str(), &vt)) {
- return false;
- }
- char buf[singa::kZKBufSize];
- for (string pname : vt) {
- pname = proc_path + "/" + pname;
- if (!zk_.GetNode(pname.c_str(), buf)) continue;
- std::string proc = "";
- for (int i = 0; buf[i] != '\0'; ++i) {
- if (buf[i] == ':') {
- buf[i] = '\0';
- proc += buf;
- } else if (buf[i] == '|') {
- proc += buf + i;
- }
- }
- procs->push_back(proc);
- }
- if (!procs->size()) LOG(ERROR) << "job " << job << " not exists";
-#endif
- return true;
-}
-
-bool JobManager::ListJobs(vector<JobInfo>* jobs) {
- jobs->clear();
-#ifdef USE_ZOOKEEPER
- vector<string> vt;
- // get all children in app path
- if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) {
- return false;
- }
- std::sort(vt.begin(), vt.end());
- int size = static_cast<int>(vt.size());
- vector<string> procs;
- for (int i = 0; i < size; ++i) {
- string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc;
- if (!zk_.GetChild(path.c_str(), &procs)) continue;
- JobInfo job;
- string jid = vt[i].substr(vt[i].length()-10);
- job.id = atoi(jid.c_str());
- job.procs = procs.size();
- jobs->push_back(job);
- // may need to delete it
- if (!job.procs && (i + kJobsNotRemoved < size))
- CleanPath(kZKPathApp + "/" + vt[i], true);
- }
-#else
- LOG(ERROR) << "Not supported without zookeeper";
-#endif
- return true;
-}
-
-bool JobManager::Remove(int job) {
-#ifdef USE_ZOOKEEPER
- string path = GetZKJobWorkspace(job) + kZKPathJobProc;
- if (zk_.Exist(path.c_str())) {
- return CleanPath(path.c_str(), false);
- }
-#else
- LOG(ERROR) << "Not supported without zookeeper";
-#endif
- return true;
-}
-
-bool JobManager::RemoveAllJobs() {
-#ifdef USE_ZOOKEEPER
- if (zk_.Exist(kZKPathApp.c_str())) {
- return CleanPath(kZKPathApp.c_str(), false);
- }
-#else
- LOG(ERROR) << "Not supported without zookeeper";
-#endif
- return true;
-}
-
-bool JobManager::CleanUp() {
-#ifdef USE_ZOOKEEPER
- if (zk_.Exist(kZKPathSinga.c_str())) {
- return CleanPath(kZKPathSinga.c_str(), true);
- }
-#else
- LOG(ERROR) << "Not supported without zookeeper";
-#endif
- return true;
-}
-
-bool JobManager::CleanPath(const string& path, bool remove) {
-#ifdef USE_ZOOKEEPER
- vector<string> child;
- if (!zk_.GetChild(path.c_str(), &child)) return false;
- for (string c : child) {
- if (!CleanPath(path + "/" + c, true)) return false;
- }
- if (remove) return zk_.DeleteNode(path.c_str());
-#else
- LOG(ERROR) << "Not supported without zookeeper";
-#endif
- return true;
-}
-
-// extract cluster configuration part from the job config file
-// TODO(wangsh) improve this function to make it robust
-string JobManager::ExtractClusterConf(const char* job_file) {
- std::ifstream fin(job_file);
- CHECK(fin.is_open()) << "cannot open job conf file " << job_file;
- string line;
- string cluster;
- bool in_cluster = false;
- while (!fin.eof()) {
- std::getline(fin, line);
- if (in_cluster == false) {
- size_t pos = line.find("cluster");
- if (pos == std::string::npos) continue;
- in_cluster = true;
- line = line.substr(pos);
- cluster = "";
- }
- if (in_cluster == true) {
- cluster += line + "\n";
- if (line.find("}") != std::string::npos)
- in_cluster = false;
- }
- }
- LOG(INFO) << "cluster configure: " << cluster;
- size_t s_pos = cluster.find("{");
- size_t e_pos = cluster.find("}");
- if (s_pos == std::string::npos || e_pos == std::string::npos) {
- LOG(FATAL) << "cannot extract valid cluster configuration in file: "
- << job_file;
- }
- return cluster.substr(s_pos + 1, e_pos - s_pos-1);
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/math_kernel.cu
----------------------------------------------------------------------
diff --git a/src/utils/math_kernel.cu b/src/utils/math_kernel.cu
deleted file mode 100644
index 65d7067..0000000
--- a/src/utils/math_kernel.cu
+++ /dev/null
@@ -1,450 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-#include <cmath>
-#include <algorithm>
-#include "singa/utils/math_kernel.h"
-#include "mshadow/tensor.h" // FLT_MIN?
-
-#define CU2DBLOCK_X 32
-#define CU2DBLOCK_Y 32
-
-#define CU1DBLOCK 1024
-#define CU1DBLOCKF 1024.0
-
-// Cuda Kernel Functions
-
-__global__
-void kernel_softmax_loss(const float *prob, const int *label , float *loss,
- int n, int dim) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- float prob_of_truth = prob[index * dim + label[index]];
- loss[index] -= log(max(prob_of_truth, FLT_MIN));
- }
-}
-
-__global__
-void kernel_softmax_gradient(float *grad, const int *label ,
- int n, int dim, float scale) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- int pos = index * dim + label[index];
- grad[pos] = (grad[pos] - 1.0f) * scale;
- }
-}
-
-__global__
-void kernel_sum_vec(float *data, float *sum , int n) {
- int THREADS = blockDim.x;
-
- __shared__ float aux[CU1DBLOCK];
- int steps = (n - 1) / THREADS + 1;
- aux[threadIdx.x] = data[threadIdx.x];
-
- for (int i = 1; i < steps; ++i) {
- if (threadIdx.x + i * THREADS < n) {
- aux[threadIdx.x] += data[threadIdx.x+i*THREADS];
- }
- }
-
- int total_threads = THREADS;
- __syncthreads();
-
- while (total_threads > 1) {
- int half_point = ((1+total_threads) >> 1);
- if (threadIdx.x < half_point) {
- if (threadIdx.x+half_point < total_threads) {
- aux[threadIdx.x] += aux[threadIdx.x + half_point];
- }
- }
- __syncthreads();
- total_threads = ((total_threads+1) >> 1);
- }
-
- __syncthreads();
- *sum = aux[0];
-}
-
-__global__
-void kernel_sum_col(const float *src_mat_data,
- float *dst_vec_data, int rows, int cols, int stride) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < rows; index += num_threads) {
- dst_vec_data[index] = 0.0f;
- for (int k = 0; k < cols; k++) {
- dst_vec_data[index] += src_mat_data[index * stride + k];
- }
- }
-}
-
-__global__
-void kernel_sum_row(const float *src_mat_data,
- float *dst_vec_data, int rows, int cols, int stride) {
- int j = blockIdx.x;
- int THREADS = blockDim.x;
- if (j >= cols) {
- return;
- }
-
- __shared__ float aux[CU1DBLOCK];
- int steps = (rows - 1) / THREADS + 1;
- aux[threadIdx.x] = src_mat_data[j+threadIdx.x*stride];
- for (int i = 1; i < steps; ++i) {
- if (threadIdx.x+i*THREADS < rows) {
- aux[threadIdx.x] += src_mat_data[j+(threadIdx.x+i*THREADS)*stride];
- }
- }
-
- int total_threads = THREADS;
- __syncthreads();
- while (total_threads > 1) {
- int half_point = ((1+total_threads) >> 1);
- if (threadIdx.x < half_point) {
- if (threadIdx.x+half_point < total_threads) {
- aux[threadIdx.x] += aux[threadIdx.x + half_point];
- }
- }
- __syncthreads();
- total_threads = ((total_threads+1) >> 1);
- }
-
- __syncthreads();
- dst_vec_data[j] = aux[0];
-}
-
-__global__
-void kernel_add_vec_row(const float *src_vec_data, const float *src_mat_data,
- float* des_mat_data, int rows, int cols, int stride) {
- int i = blockIdx.x * blockDim.x + threadIdx.x;
- int j = blockIdx.y * blockDim.y + threadIdx.y;
- int num_threads_x = blockDim.x * gridDim.x;
- int num_threads_y = blockDim.y * gridDim.y;
- int index = 0;
- for (; i < cols && j < rows; i += num_threads_x, j += num_threads_y) {
- index = j * stride + i;
- des_mat_data[index] = src_mat_data[index] + src_vec_data[i];
- }
-}
-
-__global__
-void kernel_exp(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = exp(src_data[index]);
- }
-}
-
-__global__
-void kernel_log(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = log(src_data[index]);
- }
-}
-
-__global__
-void kernel_sigmoid(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = 1.0f / (1.0f + expf(-src_data[index]));
- }
-}
-
-__global__
-void kernel_sigmoid_grad(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data[index] * (1.0f - src_data[index]);
- }
-}
-
-__global__
-void kernel_relu(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = max(src_data[index], 0.0f);
- }
-}
-
-__global__
-void kernel_relu_grad(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data[index] > 0.0f ? 1.0f : 0.0f;
- }
-}
-
-__global__
-void kernel_tanh(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = tanhf(src_data[index]);
- }
-}
-
-__global__
-void kernel_tanh_grad(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = (1.0f - src_data[index] * src_data[index]);
- }
-}
-
-__global__
-void kernel_softplus(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = logf(1 + expf(src_data[index]));
- }
-}
-
-__global__
-void kernel_softplus_grad(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = 1.0f / (1.0f + expf(-src_data[index]));
- }
-}
-
-__global__
-void kernel_square(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data[index] * src_data[index];
- }
-}
-
-__global__
-void kernel_square_grad(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = 2 * sqrt(src_data[index]);
- }
-}
-
-__global__
-void kernel_sqrt(const float *src_data, float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = sqrt(src_data[index]);
- }
-}
-
-__global__
-void kernel_pow(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = pow(src_data_a[index], src_data_b[index]);
- }
-}
-
-__global__
-void kernel_mult(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data_a[index] * src_data_b[index];
- }
-}
-
-__global__
-void kernel_div(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data_a[index] / src_data_b[index];
- }
-}
-
-__global__ static
-void kernel_set_value(float *data, float value, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- data[index] = value;
- }
-}
-
-__global__
-void kernel_threshold(const float *src_data, float *des_data,
- float alpha, int n) {
- int index = blockIdx.x * blockDim.x + threadIdx.x;
- int num_threads = blockDim.x * gridDim.x;
- for (; index < n; index += num_threads) {
- des_data[index] = src_data[index] < alpha ? 1.0f : 0.0f;
- }
-}
-
-//
-namespace singa {
-
-void singa_gpu_softmaxloss_forward(int n, int dim, const float *prob,
- const int *label, float *loss) {
- kernel_softmax_loss<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(prob, label, loss, n,
- dim);
-}
-
-void singa_gpu_softmaxloss_backward(int n, int dim, float scale,
- const int *label, float *grad) {
- kernel_softmax_gradient<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(grad, label, n,
- dim, scale);
-}
-
-void singa_gpu_sum_vec(float *data, float *sum , int n) {
- int threads_per_block = n > CU1DBLOCK ? CU1DBLOCK : n;
- // here, we only need one block
- int num_blocks = 1;
-
- kernel_sum_vec<<<num_blocks, threads_per_block>>>(data, sum, n);
-}
-
-void singa_gpu_sum_row(const float *src_mat_data, float *dst_vec_data,
- int rows, int cols, int stride) {
- int threads_per_block = rows > CU1DBLOCK ? CU1DBLOCK : rows;
- int num_blocks = cols;
-
- kernel_sum_row<<<num_blocks, threads_per_block>>>(src_mat_data,
- dst_vec_data, rows, cols, stride);
-}
-
-void singa_gpu_sum_col(const float *src_mat_data, float *dst_vec_data,
- int rows, int cols, int stride) {
- int threads_per_block = cols > CU1DBLOCK ? CU1DBLOCK : cols;
- int num_blocks = rows;
-
- kernel_sum_col<<<num_blocks, threads_per_block>>>(src_mat_data,
- dst_vec_data, rows, cols, stride);
-}
-
-void singa_gpu_add_vec_row(const float *src_vec_data, const float *src_mat_data,
- float *des_mat_data , int rows, int cols, int stride) {
- dim3 threads_per_block(CU2DBLOCK_X, CU2DBLOCK_Y);
- dim3 num_blocks(cols/threads_per_block.x +
- (cols%threads_per_block.x == 0 ? 0 : 1),
- rows/threads_per_block.y + (rows%threads_per_block.y == 0 ? 0 : 1));
- kernel_add_vec_row<<<num_blocks, threads_per_block>>>
- (src_vec_data, src_mat_data, des_mat_data, rows, cols, stride);
-}
-
-void singa_gpu_exp(const float *src_data, float *des_data, int n) {
- kernel_exp<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_log(const float *src_data, float *des_data, int n) {
- kernel_log<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_sigmoid(const float *src_data, float *des_data, int n) {
- kernel_sigmoid<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_sigmoid_grad(const float *src_data, float *des_data,
- int n) {
- kernel_sigmoid_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data, des_data, n);
-}
-
-void singa_gpu_relu(const float *src_data, float *des_data, int n) {
- kernel_relu<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_relu_grad(const float *src_data, float *des_data, int n) {
- kernel_relu_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_tanh(const float *src_data, float *des_data, int n) {
- kernel_tanh<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_tanh_grad(const float *src_data, float *des_data, int n) {
- kernel_tanh_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_softplus(const float *src_data, float *des_data, int n) {
- kernel_softplus<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_softplus_grad(const float *src_data, float *des_data, int n) {
- kernel_softplus_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data, des_data, n);
-}
-
-void singa_gpu_square(const float *src_data, float *des_data, int n) {
- kernel_square<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_square_grad(const float *src_data, float *des_data, int n) {
- kernel_square_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_sqrt(const float *src_data, float *des_data, int n) {
- kernel_sqrt<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n);
-}
-
-void singa_gpu_pow(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- kernel_pow<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data_a, src_data_b, des_data, n);
-}
-
-void singa_gpu_mult(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- kernel_mult<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data_a, src_data_b, des_data, n);
-}
-
-void singa_gpu_div(const float *src_data_a, const float *src_data_b,
- float *des_data, int n) {
- kernel_div<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data_a, src_data_b, des_data, n);
-}
-
-void singa_gpu_set_value(float *data, float value, int n) {
- kernel_set_value<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(data, value, n);
-}
-
-void singa_gpu_threshold(const float *src_data, float *des_data,
- float alpha, int n) {
- kernel_threshold<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>
- (src_data, des_data, alpha, n);
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
deleted file mode 100644
index 73d8314..0000000
--- a/src/utils/param.cc
+++ /dev/null
@@ -1,447 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/utils/param.h"
-
-#include <glog/logging.h>
-#include <cmath>
-#include <random>
-#include <unordered_map>
-#include "mshadow/tensor.h"
-#include "singa/utils/factory.h"
-#include "singa/utils/singleton.h"
-#include "singa/utils/common.h"
-
-namespace singa {
-
-using mshadow::cpu;
-using mshadow::Random;
-using mshadow::Shape1;
-using mshadow::Tensor;
-using std::vector;
-using std::string;
-
-ParamGenerator* ParamGenerator::Create(const ParamGenProto& proto) {
- auto factory = Singleton<Factory<ParamGenerator>>::Instance();
- ParamGenerator * gen = nullptr;
- if (proto.has_user_type())
- gen = factory->Create(proto.user_type());
- else
- gen = factory->Create(proto.type());
- gen->Init(proto);
- return gen;
-}
-
-void ParamGenerator::Fill(Blob<float>* blob) {
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- data = proto_.value();
-}
-
-void GaussianGen::Fill(Blob<float>* blob) {
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- auto random = TSingleton<Random<cpu>>::Instance();
- random->SampleGaussian(data, proto_.mean(), proto_.std());
- if (proto_.value() != 1)
- data *= proto_.value();
-}
-
-void GaussianSqrtFanInGen::Fill(Blob<float>* blob) {
- // only valid for param matrix with num of cols as fan in
- CHECK_EQ(blob->shape().size(), 2);
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- GaussianGen::Fill(blob);
- data /= sqrt(blob->shape().at(1));
-}
-
-void UniformGen::Fill(Blob<float>* blob) {
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- auto random = TSingleton<Random<cpu>>::Instance();
- random->SampleUniform(data, proto_.low(), proto_.high());
- if (proto_.value() != 1)
- data *= proto_.value();
-}
-
-void UniformSqrtFanInGen::Fill(Blob<float>* blob) {
- // only valid for param matrix with num of cols as fan in
- CHECK_EQ(blob->shape().size(), 2);
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- UniformGen::Fill(blob);
- data /= sqrt(blob->shape().at(1) / 3.0f);
-}
-
-void UniformSqrtFanInOutGen::Fill(Blob<float>* blob) {
- // only valid for param matrix with num of cols as fan in
- CHECK_EQ(blob->shape().size(), 2);
- Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count()));
- UniformGen::Fill(blob);
- data /= sqrt(blob->shape()[0] + blob->shape()[1]);
-}
-
-/****************** Param functions *********************************/
-Param* Param::Create(const ParamProto& proto) {
- Factory<Param>* factory = Singleton<Factory<Param>>::Instance();
- Param* p = nullptr;
- if (proto.has_user_type())
- p = factory->Create(proto.user_type());
- else
- p = factory->Create(proto.type());
- p->Init(proto);
- return p;
-}
-
-const vector<int> Param::ComputeSlices(int num, const vector<Param*>& params) {
- // collect sizes of unique Params
- std::vector<int> paramsize;
- for (auto param : params)
- if (param->id() == param->owner())
- paramsize.push_back(param->size());
- // slice into lcm pieces to achieve good load-balance for both intra-group
- // partition (among servers in a group) and inter-group partition (each group
- // is assgined a sub-set of slices)
- auto param_slice = Slice(num, paramsize);
- vector<int> slices;
- for (auto const vec : param_slice)
- for (int len : vec)
- slices.push_back(len);
- return slices;
-}
-
-void Param::SliceParams(int num, const vector<Param*>& params) {
- auto slices = ComputeSlices(num, params);
- // construct map from Param ID to its slices <slice id, len>
- std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
- int slice_id = 0;
- auto it = slices.begin();
- for (auto param : params) {
- if (param->id() == param->owner()) {
- int len = 0;
- while (len < param->size() && it != slices.end()) {
- paramid2slices[param->id()].push_back(std::make_pair(slice_id++, *it));
- len += *it;
- it++;
- }
- CHECK_EQ(param->size(), len) << "length misamtch for ID=" << param->id();
- }
- }
- for (auto param : params) {
- for (auto entry : paramid2slices[param->owner()]) {
- param->AddSlice(entry.first, entry.second);
- LOG(INFO) << "param id " << param->id() << " owner=" << param->owner()
- << ", slice id = " << entry.first << ", size = " << entry.second;
- }
- }
-}
-
-void Param::Setup(const vector<int>& shape) {
- data_.Reshape(shape);
- grad_.Reshape(shape);
- history_.Reshape(shape);
- update_.Reshape(shape);
-}
-
-void Param::InitValues() {
- InitValues(0);
-}
-
-void Param::InitValues(int version) {
- ParamGenerator* gen = ParamGenerator::Create(proto_.init());
- gen->Fill(&data_);
- set_version(version);
-}
-
-void Param::ShareDataFrom(Param* other, bool cpu_only) {
- if (this == other) {
- LOG(WARNING) << "No need to share Param with itself";
- return;
- }
-
- proto_.set_owner(other->owner());
- CHECK_EQ(data_.count(), other->data_.count());
- data_.ShareData(&(other->data_), cpu_only);
- if (grad_.count() == 0)
- grad_.Reshape(data_.shape());
- version_ = other->version_;
- last_version_ = other->last_version_;
- slice_start_ = other->slice_start_;
- num_slices_ = other->num_slices_;
- slice_offset_ = other->slice_offset_;
- slice_size_ = other->slice_size_;
- // change pending list size equal to slice size
- pending_get_.resize(other->pending_get_.size());
- pending_update_.resize(other->pending_update_.size());
-}
-
-void Param::ShareFrom(Param* other) {
- if (this == other) {
- LOG(WARNING) << "No need to share Param with itself";
- return;
- }
-
- ShareDataFrom(other, false);
- grad_.ShareData(&(other->grad_), false);
-}
-
-void Param::FromProto(const string str) {
- BlobProto blob;
- blob.ParseFromString(str);
- data_.FromProto(blob);
-}
-
-void Param::FromProto(const BlobProto& blob) {
- data_.FromProto(blob);
-}
-
-void Param::ToProto(BlobProto* blob) {
- data_.ToProto(blob);
-}
-
-void Param::AddSlice(int slice_id, int size) {
- int offset = 0;
- if (slice_size_.size() > 0) {
- // must be added in order
- CHECK_EQ(slice_start_ + num_slices_, slice_id);
- offset = slice_offset_.back() + slice_size_.back();
- } else {
- slice_start_ = slice_id;
- offset = 0;
- }
- slice_offset_.push_back(offset);
- slice_size_.push_back(size);
- pending_get_.push_back(false);
- pending_update_.push_back(false);
- num_slices_++;
-}
-
-Msg* Param::GenPutMsg(bool copy, int idx) {
- CHECK_LT(idx, num_slices_);
- Msg* msg = new Msg();
- msg->set_type(kPut);
- const void* ptr = data_.cpu_data() + slice_offset_[idx];
- const void* p = ptr;
- if (copy) p = nullptr;
- msg->AddFormatFrame("iffp", slice_size_[idx], lr_scale(), wd_scale(), p);
- if (copy) {
- msg->AddFrame(ptr, slice_size_[idx] * sizeof(float));
- }
-// LOG(ERROR) << "gen put msg: " << msg;
- return msg;
-}
-
-Msg* Param::GenGetMsg(bool copy, int idx) {
- CHECK_LT(idx, num_slices_);
- Msg* msg = new Msg();
- msg->set_type(kGet);
- msg->AddFormatFrame("ip", copy, data_.mutable_cpu_data()
- + slice_offset_[idx]);
- pending_get_[idx] = true;
- num_pending_requests_++;
- return msg;
-}
-
-Msg* Param::GenUpdateMsg(bool copy, int idx) {
- CHECK_LT(idx, num_slices_);
- Msg* msg = new Msg();
- msg->set_type(kUpdate);
- msg->AddFormatFrame("i", copy);
- const void* ptr = grad_.cpu_data() + slice_offset_[idx];
- if (copy) {
- msg->AddFrame(ptr, slice_size_[idx]*sizeof(float));
- } else {
- msg->AddFormatFrame("p", ptr); // to share values of grad blob
- }
-
- pending_update_[idx] = true;
- num_pending_requests_++;
- return msg;
-}
-
-Msg* Param::GenSyncMsg(int offset, int size) {
- Msg* msg = new Msg();
- msg->set_type(kSyncRequest);
- msg->set_trgt(ParamTrgt(-1, id()), last_version());
- // always copy data because syn is between server groups in diff procs
- msg->AddFrame(mutable_cpu_data(), data_.count()*sizeof(float));
- return msg;
-}
-
-Msg* Param::HandlePutMsg(Msg** msg, bool reserve) {
- // TODO(wangsheng) remove the check later
- CHECK(reserve);
- int size;
- float lr, wc;
- float* ptr;
-// LOG(ERROR) << "handle put msg:" << *msg;
- (*msg)->ParseFormatFrame("iffp", &size, &lr, &wc, &ptr);
- ParamProto proto;
- proto.set_lr_scale(lr);
- proto.set_wd_scale(wc);
- vector<int> shape{size};
- Init(proto);
- Setup(shape);
- if (ptr == nullptr) {
- CHECK((*msg)->NextFrame());
- CHECK_EQ(size * sizeof(float), (*msg)->FrameSize());
- memcpy(mutable_cpu_data(), (*msg)->FrameData(), size * sizeof(float));
- } else {
- data_.set_cpu_data(ptr);
- }
- if (!reserve) DeleteMsg(msg);
- return nullptr;
-}
-
-Msg* Param::HandleGetMsg(Msg** msg, bool reserve) {
- // TODO(wangsheng) remove the check later
- CHECK(!reserve);
- int copy;
- float* ptr;
- (*msg)->ParseFormatFrame("ip", ©, &ptr);
- if (copy) {
- (*msg)->AddFrame(mutable_cpu_data(), sizeof(float) * size());
- } else if (ptr != data_.cpu_data()) {
- // this case reflects following situation:
- // worker 0 and server are in the same process, while worker 1 is not.
- // worker 1 "put" data into server, so server need to allocate memory.
- // then worker 0 "get" data from server, so server need:
- // 1. copy the data to the worker0 provided space
- // 2. change its own pointer to that space in order to share memory
- // in this case, the server always points to last worker's space
- memcpy(ptr, data_.cpu_data(), sizeof(float) * size());
- data_.set_cpu_data(ptr);
- }
- // else the mem space is shared among all worker and servers
- Msg* ret = nullptr;
- if (reserve) {
- ret = new Msg(**msg);
- } else {
- // if not reserve the msg, we reuse it as return value
- ret = *msg;
- *msg = nullptr;
- }
- ret->SwapAddr();
- ret->set_type(kRGet);
- return ret;
-}
-
-void Param::ParseUpdateMsgs(const vector<Msg*>& msgs) {
- CHECK_GT(msgs.size(), 0);
- float* server_grad = nullptr;
- vector<float*> worker_grad;
- for (auto* msg : msgs) {
- int copy;
- msg->ParseFormatFrame("i", ©);
- msg->NextFrame();
- float* ptr = nullptr;
- if (copy) {
- ptr = static_cast<float*>(msg->FrameData());
- CHECK_EQ(size() * sizeof(float), msg->FrameSize());
- } else {
- msg->ParseFormatFrame("p", &ptr);
- server_grad = ptr;
- }
- worker_grad.push_back(ptr);
- }
- if (server_grad == nullptr)
- server_grad = worker_grad.at(0);
- for (float* grad : worker_grad) {
- if (grad != server_grad) {
- // TODO(wangsh) think about optimize it later?
- for (int i = 0; i < size(); i++) {
- server_grad[i] += grad[i];
- }
- }
- }
- grad_.set_cpu_data(server_grad);
-}
-
-const vector<Msg*> Param::GenUpdateResponseMsgs(vector<Msg*>* msgs,
- bool reserve) {
- // TODO(wangsheng) remove the check later
- CHECK(!reserve);
- vector<Msg*> ret;
- for (Msg* msg : *msgs) {
- Msg* ptr = reserve ? new Msg(*msg) : msg;
- ptr->FirstFrame();
- ptr->SwapAddr();
- ptr->set_type(kRUpdate);
- int copy;
- ptr->ParseFormatFrame("i", ©);
- if (copy) {
- ptr->NextFrame();
- CHECK_EQ(ptr->FrameSize(), sizeof(float) * size());
- memcpy(ptr->FrameData(), mutable_cpu_data(), ptr->FrameSize());
- }
- ret.push_back(ptr);
- }
- // if not reserved, we remove all pointers
- if (!reserve) msgs->clear();
- return ret;
-}
-
-Msg* Param::HandleSyncMsg(Msg** msg, bool reserve) {
- // TODO(wangwei) handle it later
- if (!reserve) DeleteMsg(msg);
- return nullptr;
-}
-
-int Param::ParseGetResponseMsg(Msg *msg, int slice_idx) {
- CHECK(pending_get_[slice_idx]) << slice_idx;
- pending_get_[slice_idx] = false;
- ParseResponseMsg(msg, slice_idx);
- return (--num_pending_requests_) % num_slices_ == 0;
-}
-
-int Param::ParseUpdateResponseMsg(Msg *msg, int slice_idx) {
- CHECK(pending_update_[slice_idx]) << id() << " " << slice_idx;
- pending_update_[slice_idx] = false;
- ParseResponseMsg(msg, slice_idx);
- return (--num_pending_requests_) % num_slices_ == 0;
-}
-
-int Param::ParseSyncResponseMsg(Msg* msg, int slice_idx) {
- // TODO(wangwei) handle it later
- return 1;
-}
-
-void Param::ParseResponseMsg(Msg* msg, int slice_idx) {
- int copy;
- msg->ParseFormatFrame("i", ©);
- msg->NextFrame();
- if (copy) {
- CHECK_EQ(msg->FrameSize(), slice_size_[slice_idx] * sizeof(float));
- memcpy(mutable_cpu_data() + slice_offset_[slice_idx],
- msg->FrameData(), msg->FrameSize());
- }
- // LOG(ERROR)<<"parse response norm "<<data_->asum_data()<<" of "<<id();
-}
-
-/************************ParamEntry***************************/
-ParamEntry::ParamEntry(int total, Param* p) {
- num_total = total;
- shares.push_back(p);
-}
-
-void ParamEntry::AddParam(bool local, Param* p) {
- num_local += local;
- num_total += 1;
- if (local) shares.push_back(p);
-}
-
-} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
deleted file mode 100644
index 3b1df72..0000000
--- a/src/utils/tool.cc
+++ /dev/null
@@ -1,169 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include <glog/logging.h>
-#include <algorithm>
-#include <string>
-#include <vector>
-#include "singa/proto/singa.pb.h"
-#include "singa/utils/common.h"
-#include "singa/utils/job_manager.h"
-
-std::string conf_dir;
-singa::SingaProto global;
-const int SUCCESS = 0;
-const int ARG_ERR = 1;
-const int RUN_ERR = 2;
-
-// show log dir in global config
-int getlogdir() {
- std::string dir = global.log_dir();
- while (dir.length() > 1 && dir[dir.length()-1] == '/') dir.pop_back();
- printf("%s\n", dir.c_str());
- return SUCCESS;
-}
-
-// generate a unique job id
-int create() {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- int id;
- if (!mngr.GenerateJobID(&id)) return RUN_ERR;
- printf("%d\n", id);
- return SUCCESS;
-}
-
-// generate a host list
-int genhost(char* job_conf) {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- std::vector<std::string> list;
- if (!mngr.GenerateHostList((conf_dir+"/hostfile").c_str(), job_conf, &list))
- return RUN_ERR;
- // output selected hosts
- for (std::string host : list)
- printf("%s\n", host.c_str());
- return SUCCESS;
-}
-
-// list singa jobs (running or all)
-int list(bool all) {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- std::vector<singa::JobInfo> jobs;
- if (!mngr.ListJobs(&jobs)) return RUN_ERR;
- printf("JOB ID |NUM PROCS \n");
- printf("----------|-----------\n");
- for (singa::JobInfo job : jobs) {
- if (!job.procs && !all) continue;
- printf("%-10d|%-10d\n", job.id, job.procs);
- }
- return SUCCESS;
-}
-
-// view procs of a singa job
-int view(int id) {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- std::vector<std::string> procs;
- if (!mngr.ListJobProcs(id, &procs)) return RUN_ERR;
- for (std::string s : procs) {
- printf("%s\n", s.c_str());
- }
- return SUCCESS;
-}
-
-// remove a job path in zookeeper
-int remove(int id) {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- if (!mngr.Remove(id)) return RUN_ERR;
- return SUCCESS;
-}
-
-// remove all job paths in zookeeper
-int removeall() {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- if (!mngr.RemoveAllJobs()) return RUN_ERR;
- return SUCCESS;
-}
-
-// clean all singa data in zookeeper
-int cleanup() {
- singa::JobManager mngr(global.zookeeper_host());
- if (!mngr.Init()) return RUN_ERR;
- if (!mngr.CleanUp()) return RUN_ERR;
- return SUCCESS;
-}
-
-int main(int argc, char **argv) {
- std::string usage = "Usage: singatool <command> <args>\n"
- " getlogdir : show log dir in global config\n"
- " create : generate a unique job id\n"
- " genhost <job conf> : generate a host list\n"
- " list : list running singa jobs\n"
- " listall : list all singa jobs\n"
- " view <job id> : view procs of a singa job\n"
- " remove <job id> : remove a job path in zookeeper\n"
- " removeall : remova all job paths in zookeeper\n"
- " cleanup : clean all singa data in zookeeper\n"
- "[optional arguments] NOTICE: must put at end of a command\n"
- " -confdir <dir> : path to singa global conf dir";
-
- // set logging level to ERROR and log to STDERR only
- google::LogToStderr();
- google::SetStderrLogging(google::ERROR);
- google::InitGoogleLogging(argv[0]);
- // parse -confdir argument
- int arg_pos = singa::ArgPos(argc, argv, "-confdir");
- conf_dir = arg_pos == -1 ? "conf" : argv[arg_pos+1];
- if (arg_pos != -1) argc -= 2;
- singa::ReadProtoFromTextFile((conf_dir+"/singa.conf").c_str(), &global);
-
- // stat code: ARG_ERR for wrong argument, RUN_ERR for runtime error
- int stat = (argc <= 1) ? ARG_ERR : SUCCESS;
- if (stat == SUCCESS) {
- if (!strcmp(argv[1], "getlogdir"))
- stat = getlogdir();
- else if (!strcmp(argv[1], "create"))
- stat = create();
- else if (!strcmp(argv[1], "genhost"))
- stat = (argc > 2) ? genhost(argv[2]) : genhost(nullptr);
- else if (!strcmp(argv[1], "list"))
- stat = list(false);
- else if (!strcmp(argv[1], "listall"))
- stat = list(true);
- else if (!strcmp(argv[1], "view"))
- stat = (argc > 2) ? view(atoi(argv[2])) : ARG_ERR;
- else if (!strcmp(argv[1], "remove"))
- stat = (argc > 2) ? remove(atoi(argv[2])) : ARG_ERR;
- else if (!strcmp(argv[1], "removeall"))
- stat = removeall();
- else if (!strcmp(argv[1], "cleanup"))
- stat = cleanup();
- else
- stat = ARG_ERR;
- }
-
- if (stat == ARG_ERR) LOG(ERROR) << usage;
- return stat;
-}