You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/04/04 11:09:44 UTC
[1/4] incubator-singa git commit: SINGA-156 Remove the dependency on
ZMQ for single process training
Repository: incubator-singa
Updated Branches:
refs/heads/master 0233049ce -> 914c1e722
SINGA-156 Remove the dependency on ZMQ for single process training
Update driver, Server, Worker for using new Dealer/Router;
Implement Msg class without ZMQ.
Dealer must check its own msg queue to recv msgs.
The Router may recv msgs from msg queue or zmq (for inter-process comm).
There are more msgs from inter-comm, which may block the recving of
inter-comm msgs if not handled properly (e.g., stop recv zmq msg once
getting a msg from inter-comm).
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d8dffdf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d8dffdf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d8dffdf0
Branch: refs/heads/master
Commit: d8dffdf02f90f338388d02e5032d5ca8d0b561e9
Parents: 0233049
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Tue Mar 29 20:35:46 2016 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Sat Apr 2 12:12:52 2016 +0800
----------------------------------------------------------------------
include/singa/comm/msg.h | 9 +-
include/singa/comm/socket.h | 160 +++++++--------------
include/singa/utils/safe_queue.h | 263 ++++++++++++++++++++++++++++++++++
src/comm/msg.cc | 77 ++++++++--
src/comm/socket.cc | 188 ++++++++++--------------
src/driver.cc | 19 ++-
src/server.cc | 21 +--
src/stub.cc | 15 +-
src/utils/cluster_rt.cc | 4 +-
src/utils/param.cc | 2 +
src/worker.cc | 16 +--
11 files changed, 493 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/include/singa/comm/msg.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/msg.h b/include/singa/comm/msg.h
index 50a9b81..ade7fc8 100644
--- a/include/singa/comm/msg.h
+++ b/include/singa/comm/msg.h
@@ -22,10 +22,12 @@
#ifndef SINGA_COMM_MSG_H_
#define SINGA_COMM_MSG_H_
+#include <utility>
+
// TODO(wangwei): make it a compiler argument
#define USE_ZMQ
-#include <utility>
+#include <vector>
#ifdef USE_ZMQ
#include <czmq.h>
#endif
@@ -79,7 +81,7 @@ inline int AddrType(int addr) {
}
/**
- * Msg used to transfer Param info (gradient or value), feature blob, etc
+ * Msg used to transfer Param info (gradient or value), feature blob, etc.
* between workers, stubs and servers.
*
* Each msg has a source addr and dest addr identified by a unique integer.
@@ -225,6 +227,9 @@ class Msg {
#ifdef USE_ZMQ
zmsg_t* msg_ = nullptr;
zframe_t *frame_ = nullptr;
+#else
+ std::vector<std::pair<void*, int>> frames_;
+ unsigned idx_ = 0;
#endif
};
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/include/singa/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h
index fae9ccb..de8cbde 100644
--- a/include/singa/comm/socket.h
+++ b/include/singa/comm/socket.h
@@ -25,150 +25,98 @@
#ifdef USE_ZMQ
#include <czmq.h>
#endif
+
#include <map>
#include <string>
#include <vector>
+#include <unordered_map>
+#include "singa/utils/safe_queue.h"
#include "singa/comm/msg.h"
namespace singa {
-
-const std::string kInprocRouterEndpoint = "inproc://router";
-
-class SocketInterface {
+/**
+ * Worker and Server use Dealer to communicate with Stub.
+ * Stub uses Dealer to communicate with remote Stub.
+ */
+class Dealer {
public:
- virtual ~SocketInterface() {}
- /**
- * Send a message to connected socket(s), non-blocking. The message
- * will be deallocated after sending, thus should not be used after
- * calling Send();
- *
- * @param msg The message to be sent
- * @return 1 for success queuing the message for sending, 0 for failure
+ /**
+ * @param id used for identifying the msg queue of this dealer.
*/
- virtual int Send(Msg** msg) = 0;
+ Dealer(int id) : id_(id) {}
+ ~Dealer();
/**
- * Receive a message from any connected socket.
+ * Setup the connection with the remote router.
*
- * @return a message pointer if success; nullptr if failure
+ * For local router, there is no need to connect it.
+ *
+ * @param endpoint Identifier of the remote router to connect. It follows
+ * ZeroMQ's format, i.e., IP:port, where IP is the connected process.
+ * @return 1 connection sets up successfully; 0 otherwise
*/
- virtual Msg* Receive() = 0;
+ int Connect(const std::string& endpoint);
/**
- * @return Identifier of the implementation dependent socket. E.g., zsock_t*
- * for ZeroMQ implementation and rank for MPI implementation.
+ * Send a message to the local router (id=-1) or remote outer. It is
+ * non-blocking. The message will be deallocated after sending, thus
+ * should not be used after calling Send();
*/
- virtual void* InternalID() const = 0;
-};
-
-class Poller {
- public:
- Poller();
- explicit Poller(SocketInterface* socket);
- /**
- * Add a socket for polling; Multiple sockets can be polled together by
- * adding them into the same poller.
- */
- void Add(SocketInterface* socket);
- /**
- * Poll for all sockets added into this poller.
- * @param timeout Stop after this number of mseconds
- * @return pointer To the socket if it has one message in the receiving
- * queue; nullptr if no message in any sockets,
- */
- SocketInterface* Wait(int duration);
-
+ int Send(Msg** msg);
/**
- * @return true if the poller is terminated due to process interupt
- */
- virtual bool Terminated();
-
- protected:
-#ifdef USE_ZMQ
- zpoller_t *poller_;
- std::map<zsock_t*, SocketInterface*> zsock2Socket_;
-#endif
-};
-
-class Dealer : public SocketInterface {
- public:
- /*
- * @param id Local dealer ID within a procs if the dealer is from worker or
- * server thread, starts from 1 (0 is used by the router); or the connected
- * remote procs ID for inter-process dealers from the stub thread.
+ * Recv msg from local router.
+ *
+ * @param timeout return if waiting for timeout microseconds.
+ * @return a message pointer if success; nullptr if failure
*/
- Dealer();
- explicit Dealer(int id);
- ~Dealer() override;
- /**
- * Setup the connection with the router.
- *
- * @param endpoint Identifier of the router. For intra-process
- * connection, the endpoint follows the format of ZeroMQ, i.e.,
- * starting with "inproc://"; in Singa, since each process has one
- * router, hence we can fix the endpoint to be "inproc://router" for
- * intra-process. For inter-process, the endpoint follows ZeroMQ's
- * format, i.e., IP:port, where IP is the connected process.
- * @return 1 connection sets up successfully; 0 otherwise
- */
- int Connect(const std::string& endpoint);
- int Send(Msg** msg) override;
- Msg* Receive() override;
- void* InternalID() const override;
+ Msg* Receive(int timeout = 0);
protected:
- int id_ = -1;
+ std::string endpoint_;
+ int id_;
#ifdef USE_ZMQ
zsock_t* dealer_ = nullptr;
- zpoller_t* poller_ = nullptr;
#endif
};
-
-class Router : public SocketInterface {
+/**
+ * In Singa, since each process has one router used by Stub, hence we fix the
+ * router to use the msg queue indexed by -1.
+ */
+class Router {
public:
- Router();
- /**
- * There is only one router per procs, hence its local id is 0 and is not set
- * explicitly.
- *
- * @param bufsize Buffer at most this number of messages
- */
- explicit Router(int bufsize);
- ~Router() override;
+ ~Router();
/**
- * Setup the connection with dealers.
+ * Bind the router to an endpoint for recv msg from remote dealer.
+ * If the router is used for intra-communication only, then no need to call
+ * Bind.
*
- * It automatically binds to the endpoint for intra-process communication,
- * i.e., "inproc://router".
- *
- * @param endpoint The identifier for the Dealer socket in other process
+ * @param endpoint identifier for the Dealer socket in other process
* to connect. It has the format IP:Port, where IP is the host machine.
- * If endpoint is empty, it means that all connections are
- * intra-process connection.
* @return number of connected dealers.
*/
int Bind(const std::string& endpoint);
/**
- * If the destination socket has not connected yet, buffer this the message.
+ * Send msg to local dealers by pushing the msg into the msg queue indexed by
+ * dst of the msg.
+ */
+ int Send(Msg** msg);
+ /**
+ * Recv msg from local (msg queue) or remote dealer (via zmq).
*/
- int Send(Msg** msg) override;
- Msg* Receive() override;
- void* InternalID() const override;
+ Msg* Receive(int timeout = 0);
protected:
- int nBufmsg_ = 0;
- int bufsize_ = 100;
+ std::string endpoint_;
#ifdef USE_ZMQ
zsock_t* router_ = nullptr;
zpoller_t* poller_ = nullptr;
- std::map<int, zframe_t*> id2addr_;
- std::map<int, std::vector<zmsg_t*>> bufmsg_;
#endif
};
-#ifdef USE_MPI
-// TODO(wangsheng): add intra-process communication using shared queue
-std::vector<SafeQueue*> MPIQueues;
-#endif
-
+/**
+ * Used for intra-process communication.
+ * Each dealer/router has a SafeQueue for recieving msgs.
+ * The sender pushes msgs onto the queue of the reciever's queue.
+ */
+extern std::unordered_map<int, SafeQueue<Msg*>> msgQueues;
} // namespace singa
#endif // SINGA_COMM_SOCKET_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/include/singa/utils/safe_queue.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/safe_queue.h b/include/singa/utils/safe_queue.h
new file mode 100644
index 0000000..99adbf0
--- /dev/null
+++ b/include/singa/utils/safe_queue.h
@@ -0,0 +1,263 @@
+#ifndef SINGA_UTILS_SAFE_QUEUE_H_
+#define SINGA_UTILS_SAFE_QUEUE_H_
+
+// source: http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
+#include <queue>
+#include <list>
+#include <mutex>
+#include <thread>
+#include <cstdint>
+#include <condition_variable>
+
+/** A thread-safe asynchronous queue */
+template <class T, class Container = std::list<T>>
+class SafeQueue {
+
+ typedef typename Container::value_type value_type;
+ typedef typename Container::size_type size_type;
+ typedef Container container_type;
+
+ public:
+
+ /*! Create safe queue. */
+ SafeQueue() = default;
+ SafeQueue (SafeQueue&& sq) {
+ m_queue = std::move (sq.m_queue);
+ }
+ SafeQueue (const SafeQueue& sq) {
+ std::lock_guard<std::mutex> lock (sq.m_mutex);
+ m_queue = sq.m_queue;
+ }
+
+ /*! Destroy safe queue. */
+ ~SafeQueue() {
+ std::lock_guard<std::mutex> lock (m_mutex);
+ }
+
+ /**
+ * Sets the maximum number of items in the queue. Defaults is 0: No limit
+ * \param[in] item An item.
+ */
+ void set_max_num_items (unsigned int max_num_items) {
+ m_max_num_items = max_num_items;
+ }
+
+ /**
+ * Pushes the item into the queue.
+ * \param[in] item An item.
+ * \return true if an item was pushed into the queue
+ */
+ bool push (const value_type& item) {
+ std::lock_guard<std::mutex> lock (m_mutex);
+
+ if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
+ return false;
+
+ m_queue.push (item);
+ m_condition.notify_one();
+ return true;
+ }
+
+ /**
+ * Pushes the item into the queue.
+ * \param[in] item An item.
+ * \return true if an item was pushed into the queue
+ */
+ bool push (const value_type&& item) {
+ std::lock_guard<std::mutex> lock (m_mutex);
+
+ if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
+ return false;
+
+ m_queue.push (item);
+ m_condition.notify_one();
+ return true;
+ }
+
+ /**
+ * Pops item from the queue. If queue is empty, this function blocks until item becomes available.
+ * \param[out] item The item.
+ */
+ void pop (value_type& item) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+ m_condition.wait (lock, [this]() // Lambda funct
+ {
+ return !m_queue.empty();
+ });
+ item = m_queue.front();
+ m_queue.pop();
+ }
+
+ /**
+ * Pops item from the queue using the contained type's move assignment operator, if it has one..
+ * This method is identical to the pop() method if that type has no move assignment operator.
+ * If queue is empty, this function blocks until item becomes available.
+ * \param[out] item The item.
+ */
+ void move_pop (value_type& item) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+ m_condition.wait (lock, [this]() // Lambda funct
+ {
+ return !m_queue.empty();
+ });
+ item = std::move (m_queue.front());
+ m_queue.pop();
+ }
+
+ /**
+ * Tries to pop item from the queue.
+ * \param[out] item The item.
+ * \return False is returned if no item is available.
+ */
+ bool try_pop (value_type& item) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+
+ if (m_queue.empty())
+ return false;
+
+ item = m_queue.front();
+ m_queue.pop();
+ return true;
+ }
+
+ /**
+ * Tries to pop item from the queue using the contained type's move assignment operator, if it has one..
+ * This method is identical to the try_pop() method if that type has no move assignment operator.
+ * \param[out] item The item.
+ * \return False is returned if no item is available.
+ */
+ bool try_move_pop (value_type& item) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+
+ if (m_queue.empty())
+ return false;
+
+ item = std::move (m_queue.front());
+ m_queue.pop();
+ return true;
+ }
+
+ /**
+ * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
+ * \param[out] t An item.
+ * \param[in] timeout The number of microseconds to wait.
+ * \return true if get an item from the queue, false if no item is received before the timeout.
+ */
+ bool timeout_pop (value_type& item, std::uint64_t timeout) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+
+ if (m_queue.empty())
+ {
+ if (timeout == 0)
+ return false;
+
+ if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+ return false;
+ }
+
+ item = m_queue.front();
+ m_queue.pop();
+ return true;
+ }
+
+ /**
+ * Pops item from the queue using the contained type's move assignment operator, if it has one..
+ * If the queue is empty, blocks for timeout microseconds, or until item becomes available.
+ * This method is identical to the try_pop() method if that type has no move assignment operator.
+ * \param[out] t An item.
+ * \param[in] timeout The number of microseconds to wait.
+ * \return true if get an item from the queue, false if no item is received before the timeout.
+ */
+ bool timeout_move_pop (value_type& item, std::uint64_t timeout) {
+ std::unique_lock<std::mutex> lock (m_mutex);
+
+ if (m_queue.empty())
+ {
+ if (timeout == 0)
+ return false;
+
+ if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+ return false;
+ }
+
+ item = std::move (m_queue.front());
+ m_queue.pop();
+ return true;
+ }
+
+ /**
+ * Gets the number of items in the queue.
+ * \return Number of items in the queue.
+ */
+ size_type size() const {
+ std::lock_guard<std::mutex> lock (m_mutex);
+ return m_queue.size();
+ }
+
+ /**
+ * Check if the queue is empty.
+ * \return true if queue is empty.
+ */
+ bool empty() const {
+ std::lock_guard<std::mutex> lock (m_mutex);
+ return m_queue.empty();
+ }
+
+ /**
+ * Swaps the contents.
+ * \param[out] sq The SafeQueue to swap with 'this'.
+ */
+ void swap (SafeQueue& sq) {
+ if (this != &sq) {
+ std::lock_guard<std::mutex> lock1 (m_mutex);
+ std::lock_guard<std::mutex> lock2 (sq.m_mutex);
+ m_queue.swap (sq.m_queue);
+
+ if (!m_queue.empty())
+ m_condition.notify_all();
+
+ if (!sq.m_queue.empty())
+ sq.m_condition.notify_all();
+ }
+ }
+
+ /*! The copy assignment operator */
+ SafeQueue& operator= (const SafeQueue& sq) {
+ if (this != &sq) {
+ std::lock_guard<std::mutex> lock1 (m_mutex);
+ std::lock_guard<std::mutex> lock2 (sq.m_mutex);
+ std::queue<T, Container> temp {sq.m_queue};
+ m_queue.swap (temp);
+
+ if (!m_queue.empty())
+ m_condition.notify_all();
+ }
+
+ return *this;
+ }
+
+ /*! The move assignment operator */
+ SafeQueue& operator= (SafeQueue && sq) {
+ std::lock_guard<std::mutex> lock (m_mutex);
+ m_queue = std::move (sq.m_queue);
+
+ if (!m_queue.empty()) m_condition.notify_all();
+
+ return *this;
+ }
+
+
+ private:
+
+ std::queue<T, Container> m_queue;
+ mutable std::mutex m_mutex;
+ std::condition_variable m_condition;
+ unsigned int m_max_num_items = 0;
+};
+
+/*! Swaps the contents of two SafeQueue objects. */
+template <class T, class Container>
+void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) {
+ q1.swap (q2);
+}
+#endif // SINGA_UTILS_SAFE_QUEUE_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/comm/msg.cc
----------------------------------------------------------------------
diff --git a/src/comm/msg.cc b/src/comm/msg.cc
index 5c33026..94f3074 100644
--- a/src/comm/msg.cc
+++ b/src/comm/msg.cc
@@ -22,18 +22,25 @@
#include "singa/comm/msg.h"
#include <glog/logging.h>
+#include <stdarg.h>
namespace singa {
-#ifdef USE_ZMQ
Msg::~Msg() {
+#ifdef USE_ZMQ
if (msg_ != nullptr)
zmsg_destroy(&msg_);
frame_ = nullptr;
+#else
+ for (auto& frame : frames_)
+ delete static_cast<char*>(frame.first);
+#endif
}
Msg::Msg() {
+#ifdef USE_ZMQ
msg_ = zmsg_new();
+#endif
}
Msg::Msg(const Msg& msg) {
@@ -42,51 +49,49 @@ Msg::Msg(const Msg& msg) {
type_ = msg.type_;
trgt_val_ = msg.trgt_val_;
trgt_version_ = msg.trgt_version_;
+#ifdef USE_ZMQ
msg_ = zmsg_dup(msg.msg_);
+#endif
}
Msg::Msg(int src, int dst) {
src_ = src;
dst_ = dst;
+#ifdef USE_ZMQ
msg_ = zmsg_new();
+#endif
}
void Msg::SwapAddr() {
std::swap(src_, dst_);
}
+#ifdef USE_ZMQ
int Msg::size() const {
return zmsg_content_size(msg_);
}
-
void Msg::AddFrame(const void* addr, int nBytes) {
zmsg_addmem(msg_, addr, nBytes);
}
-
int Msg::FrameSize() {
return zframe_size(frame_);
}
-
-void* Msg::FrameData() {
- return zframe_data(frame_);
-}
-
char* Msg::FrameStr() {
return zframe_strdup(frame_);
}
+void* Msg::FrameData() {
+ return zframe_data(frame_);
+}
bool Msg::NextFrame() {
frame_ = zmsg_next(msg_);
return frame_ != nullptr;
}
-
void Msg::FirstFrame() {
frame_ = zmsg_first(msg_);
}
-
void Msg::LastFrame() {
frame_ = zmsg_last(msg_);
}
-
void Msg::ParseFromZmsg(zmsg_t* msg) {
char* tmp = zmsg_popstr(msg);
sscanf(tmp, "%d %d %d %d %d",
@@ -103,6 +108,49 @@ zmsg_t* Msg::DumpToZmsg() {
return tmp;
}
+#else
+
+int Msg::size() const {
+ int s = 0;
+ for (auto& entry : frames_)
+ s += entry.second;
+ return s;
+}
+
+void Msg::AddFrame(const void* addr, int nBytes) {
+ char* tmp = new char[nBytes];
+ memcpy(tmp, addr, nBytes);
+ frames_.push_back(std::make_pair(tmp, nBytes));
+}
+
+int Msg::FrameSize() {
+ return frames_.at(idx_).second;
+}
+
+char* Msg::FrameStr() {
+ return static_cast<char*>(frames_.at(idx_).first);
+}
+
+void* Msg::FrameData() {
+ return frames_.at(idx_).first;
+}
+
+bool Msg::NextFrame() {
+ idx_++;
+// LOG(ERROR) << "idx " << idx_ << " vs size " << frames_.size();
+ return idx_ < frames_.size();
+}
+
+void Msg::FirstFrame() {
+ idx_ = 0;
+}
+
+void Msg::LastFrame() {
+ idx_ = frames_.size() - 1;
+}
+
+#endif
+
// frame marker indicating this frame is serialize like printf
#define FMARKER "*singa*"
@@ -156,14 +204,14 @@ int Msg::AddFormatFrame(const char *format, ...) {
CHECK_LE(size, kMaxFrameLen);
}
va_end(argptr);
- zmsg_addmem(msg_, dst, size);
+ AddFrame(dst, size);
return size;
}
int Msg::ParseFormatFrame(const char *format, ...) {
va_list argptr;
va_start(argptr, format);
- char* src = zframe_strdup(frame_);
+ char* src = FrameStr();
CHECK_STREQ(FMARKER, src);
int size = strlen(FMARKER) + 1;
while (*format) {
@@ -207,9 +255,8 @@ int Msg::ParseFormatFrame(const char *format, ...) {
format++;
}
va_end(argptr);
- delete src;
+ // delete src;
return size;
}
-#endif
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/comm/socket.cc
----------------------------------------------------------------------
diff --git a/src/comm/socket.cc b/src/comm/socket.cc
index 09a6913..8245398 100644
--- a/src/comm/socket.cc
+++ b/src/comm/socket.cc
@@ -23,158 +23,116 @@
#include <glog/logging.h>
namespace singa {
-
-#ifdef USE_ZMQ
-Poller::Poller() {
- poller_ = zpoller_new(nullptr);
-}
-
-Poller::Poller(SocketInterface* socket) {
- poller_ = zpoller_new(nullptr);
- Add(socket);
-}
-
-void Poller::Add(SocketInterface* socket) {
- zsock_t* zsock = static_cast<zsock_t*>(socket->InternalID());
- zpoller_add(poller_, zsock);
- zsock2Socket_[zsock] = socket;
-}
-
-SocketInterface* Poller::Wait(int timeout) {
- zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout));
- if (sock != nullptr)
- return zsock2Socket_[sock];
- else
- return nullptr;
-}
-
-bool Poller::Terminated() {
- return zpoller_terminated(poller_);
-}
-
-
-Dealer::Dealer() : Dealer(-1) {}
-
-Dealer::Dealer(int id) : id_(id) {
- dealer_ = zsock_new(ZMQ_DEALER);
- CHECK_NOTNULL(dealer_);
-}
-
+const int TIME_OUT = 2; // max blocking time in milliseconds.
+std::unordered_map<int, SafeQueue<Msg*>> msgQueues;
Dealer::~Dealer() {
+#ifdef USE_ZMQ
zsock_destroy(&dealer_);
+#endif
}
int Dealer::Connect(const std::string& endpoint) {
- CHECK_GT(endpoint.length(), 0);
- if (endpoint.length()) {
+ if (endpoint.length() > 0) {
+#ifdef USE_ZMQ
+ dealer_ = zsock_new(ZMQ_DEALER);
+ CHECK_NOTNULL(dealer_);
CHECK_EQ(zsock_connect(dealer_, "%s", endpoint.c_str()), 0);
- return 1;
+#else
+ LOG(FATAL) << "No message passing lib is linked";
+#endif
+ endpoint_ = endpoint;
}
- return 0;
+ return 1;
}
int Dealer::Send(Msg** msg) {
- zmsg_t* zmsg = (*msg)->DumpToZmsg();
- zmsg_send(&zmsg, dealer_);
- delete *msg;
- *msg = nullptr;
+ if (endpoint_.length()) {
+#ifdef USE_ZMQ
+ zmsg_t* zmsg = (*msg)->DumpToZmsg();
+ zmsg_send(&zmsg, dealer_);
+#else
+ LOG(FATAL) << "No message passing lib is linked";
+#endif
+ delete *msg;
+ *msg = nullptr;
+ } else {
+ msgQueues.at(-1).push(*msg);
+ }
return 1;
}
-Msg* Dealer::Receive() {
- zmsg_t* zmsg = zmsg_recv(dealer_);
- if (zmsg == nullptr)
- return nullptr;
- Msg* msg = new Msg();
- msg->ParseFromZmsg(zmsg);
+Msg* Dealer::Receive(int timeout) {
+ Msg* msg = nullptr;
+ if (timeout > 0) {
+ if(!msgQueues.at(id_).timeout_pop(msg, timeout))
+ return nullptr;
+ } else {
+ msgQueues.at(id_).pop(msg);
+ }
+ msg->FirstFrame();
return msg;
}
-void* Dealer::InternalID() const {
- return dealer_;
-}
-
-Router::Router() : Router(100) {}
-
-Router::Router(int bufsize) {
- nBufmsg_ = 0;
- bufsize_ = bufsize;
- router_ = zsock_new(ZMQ_ROUTER);
- CHECK_NOTNULL(router_);
- poller_ = zpoller_new(router_);
- CHECK_NOTNULL(poller_);
-}
-
Router::~Router() {
+#ifdef USE_ZMQ
zsock_destroy(&router_);
- for (auto it : id2addr_)
- zframe_destroy(&it.second);
- for (auto it : bufmsg_) {
- for (auto *msg : it.second)
- zmsg_destroy(&msg);
- }
+#endif
}
+
int Router::Bind(const std::string& endpoint) {
int port = -1;
- if (endpoint.length()) {
+ if (endpoint.length() > 0) {
+ endpoint_ = endpoint;
+#ifdef USE_ZMQ
+ router_ = zsock_new(ZMQ_ROUTER);
+ CHECK_NOTNULL(router_);
port = zsock_bind(router_, "%s", endpoint.c_str());
+ CHECK_NE(port, -1) << endpoint;
+ LOG(INFO) << "bind successfully to " << zsock_endpoint(router_);
+ poller_ = zpoller_new(router_);
+#else
+ LOG(FATAL) << "No message passing lib is linked";
+#endif
}
- CHECK_NE(port, -1) << endpoint;
- LOG(INFO) << "bind successfully to " << zsock_endpoint(router_);
return port;
}
int Router::Send(Msg **msg) {
- zmsg_t* zmsg = (*msg)->DumpToZmsg();
int dstid = (*msg)->dst();
- if (id2addr_.find(dstid) != id2addr_.end()) {
- // the connection has already been set up
- zframe_t* addr = zframe_dup(id2addr_[dstid]);
- zmsg_prepend(zmsg, &addr);
- zmsg_send(&zmsg, router_);
+ if (msgQueues.find(dstid) != msgQueues.end()) {
+ msgQueues.at(dstid).push(*msg);
} else {
- // the connection is not ready, buffer the message
- if (bufmsg_.size() == 0)
- nBufmsg_ = 0;
- bufmsg_[dstid].push_back(zmsg);
- ++nBufmsg_;
- CHECK_LE(nBufmsg_, bufsize_);
+ LOG(FATAL) << "The dst queue not exist for dstid = " << dstid;
}
- delete *msg;
- *msg = nullptr;
return 1;
}
-Msg* Router::Receive() {
- zmsg_t* zmsg = zmsg_recv(router_);
- if (zmsg == nullptr) {
- LOG(ERROR) << "Connection broken!";
- exit(0);
- }
- zframe_t* dealer = zmsg_pop(zmsg);
- Msg* msg = new Msg();
- msg->ParseFromZmsg(zmsg);
- if (id2addr_.find(msg->src()) == id2addr_.end()) {
- // new connection, store the sender's identfier and send buffered messages
- // for it
- id2addr_[msg->src()] = dealer;
- if (bufmsg_.find(msg->src()) != bufmsg_.end()) {
- for (auto& it : bufmsg_.at(msg->src())) {
- zframe_t* addr = zframe_dup(dealer);
- zmsg_prepend(it, &addr);
- zmsg_send(&it, router_);
+Msg* Router::Receive(int timeout) {
+ Msg* msg = nullptr;
+ if (timeout == 0)
+ timeout = TIME_OUT;
+ while (msg == nullptr) {
+#ifdef USE_ZMQ
+ if (router_ != nullptr) {
+ zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout));
+ if (sock != NULL) {
+ zmsg_t* zmsg = zmsg_recv(router_);
+ if (zmsg == nullptr) {
+ LOG(ERROR) << "Connection broken!";
+ exit(0);
+ }
+ zframe_t* dealer = zmsg_pop(zmsg);
+ zframe_destroy(&dealer);
+ Msg* remote_msg = new Msg();
+ remote_msg->ParseFromZmsg(zmsg);
+ msgQueues.at(-1).push(remote_msg);
}
- bufmsg_.erase(msg->src());
}
- } else {
- zframe_destroy(&dealer);
+#endif
+ msgQueues.at(-1).timeout_pop(msg, timeout * 10);
}
+ msg->FirstFrame();
return msg;
}
-void* Router::InternalID() const {
- return router_;
-}
-#endif
-
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 6163865..2952c62 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -25,6 +25,7 @@
#include <set>
#include <string>
#include <vector>
+#include "singa/comm/socket.h"
#include "singa/neuralnet/layer.h"
#include "singa/utils/common.h"
#include "singa/utils/tinydir.h"
@@ -231,12 +232,18 @@ void Driver::Train(const JobProto& job_conf) {
net->ToGraph(true).ToJson());
const vector<Worker*> workers = CreateWorkers(job_conf, net);
const vector<Server*> servers = CreateServers(job_conf, net);
-
-#ifdef USE_MPI
- int nthreads = workers.size() + servers.size() + 1;
- for (int i = 0; i < nthreads; i++)
- MPIQueues.push_back(make_shared<SafeQueue>());
-#endif
+ // Add msg queues for each socket
+ for (auto worker : workers) {
+ msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerParam)];
+ msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerLayer)];
+// LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerParam);
+// LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerLayer);
+ }
+ for (auto server : servers) {
+ msgQueues[Addr(server->grp_id(), server->id(), kServer)];
+// LOG(ERROR) << "server addr " << Addr(server->grp_id(), server->id(), kServer);
+ }
+ msgQueues[-1];
vector<std::thread> threads;
for (auto server : servers)
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/server.cc
----------------------------------------------------------------------
diff --git a/src/server.cc b/src/server.cc
index bd7b5f8..d5ef028 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -71,28 +71,15 @@ void Server::Run() {
n_pending_sync_.resize(slice2group_.size(), 0);
last_sync_.resize(slice2group_.size());
- // TODO(wangsh): give each dealer a unique id
- auto dealer = new Dealer(0);
- CHECK(dealer->Connect(kInprocRouterEndpoint));
- Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
- ping->set_type(kConnect);
- dealer->Send(&ping);
-
bool running = true;
CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
- Poller poll(dealer);
+ auto dealer = new Dealer(Addr(grp_id_, id_, kServer));
// start recv loop and process requests
while (running) {
- // must use poller here; otherwise Receive() gets stuck after workers stop.
- auto* sock = poll.Wait(cluster->poll_time());
- if (poll.Terminated()) {
- LOG(ERROR) << "Connection broken!";
- exit(0);
- } else if (sock == nullptr) {
+ // cannot use blocking Receive() here, it will get stuck after workers stop.
+ Msg* msg = dealer->Receive(cluster->poll_time());
+ if (msg == nullptr)
continue;
- }
- Msg* msg = dealer->Receive();
- if (msg == nullptr) break; // interrupted
Msg* response = nullptr;
int type = msg->type();
int slice_id = SliceID(msg->trgt_val());
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/stub.cc
----------------------------------------------------------------------
diff --git a/src/stub.cc b/src/stub.cc
index c06128c..c7658fc 100644
--- a/src/stub.cc
+++ b/src/stub.cc
@@ -43,11 +43,13 @@ Stub::~Stub() {
}
void Stub::Setup() {
router_ = new Router();
- router_->Bind(kInprocRouterEndpoint);
auto cluster = Cluster::Get();
- const string hostip = cluster->hostip();
- int port = router_->Bind("tcp://" + hostip + ":*");
- endpoint_ = hostip + ":" + std::to_string(port);
+ if (cluster->nprocs() > 1) {
+ const string hostip = cluster->hostip();
+ int port = router_->Bind("tcp://" + hostip + ":*");
+ endpoint_ = hostip + ":" + std::to_string(port);
+ } else
+ endpoint_ = "localhost";
}
/**
* Get a hash id for a Param object from a group.
@@ -116,6 +118,7 @@ void Stub::Run(const vector<int>& slice2server,
msg = msg_queue.front();
msg_queue.pop();
}
+// LOG(ERROR) << "stub recv msg " << msg;
int type = msg->type(), dst = msg->dst(), flag = AddrType(dst);
if (flag == kStub && (AddrProc(dst) == procs_id || AddrGrp(dst) == -1)) {
// the following statements are ordered!
@@ -174,6 +177,7 @@ void Stub::Run(const vector<int>& slice2server,
inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs);
inter_dealers[dst_procs]->Send(&msg);
} else {
+// LOG(ERROR) << "router send msg " << msg;
router_->Send(&msg);
}
}
@@ -186,7 +190,7 @@ void Stub::Run(const vector<int>& slice2server,
Dealer* Stub::CreateInterProcsDealer(int dst_procs) {
// forward to other procs
auto cluster = Cluster::Get();
- auto dealer = new Dealer();
+ auto dealer = new Dealer(-2);
while (cluster->endpoint(dst_procs) == "") {
// kCollectSleepTime));
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
@@ -223,6 +227,7 @@ void Stub::GenMsgs(int type, int version, ParamEntry* entry, Msg* msg,
new_msg->set_src(Addr(src_grp, procs_id, kStub));
new_msg->set_dst(Addr(dst_grp, server, kServer));
ret->push_back(new_msg);
+// LOG(ERROR) << "stub gen msg " << new_msg;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 7a04ff7..9a7b8bd 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
-*
+*
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index 158c777..e1c04c7 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -235,6 +235,7 @@ Msg* Param::GenPutMsg(bool copy, int idx) {
if (copy) {
msg->AddFrame(ptr, slice_size_[idx] * sizeof(float));
}
+// LOG(ERROR) << "gen put msg: " << msg;
return msg;
}
@@ -281,6 +282,7 @@ Msg* Param::HandlePutMsg(Msg** msg, bool reserve) {
int size;
float lr, wc;
float* ptr;
+// LOG(ERROR) << "handle put msg:" << *msg;
(*msg)->ParseFormatFrame("iffp", &size, &lr, &wc, &ptr);
ParamProto proto;
proto.set_lr_scale(lr);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d8dffdf0/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
index 1e35ff9..6c461ce 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -123,24 +123,13 @@ void Worker::Test(int steps, Phase phase, NeuralNet* net) {
Display(phase, " ", net);
}
-void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
- dealer->Connect(kInprocRouterEndpoint);
- Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
- ping->set_type(kConnect);
- dealer->Send(&ping);
-}
-
void Worker::InitSockets(const NeuralNet* net) {
- // TODO(wangsh): provide a unique sock id from cluster
- dealer_ = new Dealer(0);
- ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
+ dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerParam));
for (auto layer : net->layers()) {
if (layer->partition_id() == id_) {
if (typeid(*layer) == typeid(BridgeDstLayer)
|| typeid(*layer) == typeid(BridgeSrcLayer)) {
- // TODO(wangsh): provide a unique socket id from cluster
- bridge_dealer_ = new Dealer(1);
- ConnectStub(grp_id_, id_, bridge_dealer_, kWorkerLayer);
+ bridge_dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerLayer));
break;
}
}
@@ -253,6 +242,7 @@ int Worker::Put(int step, Param* param) {
msg->set_trgt(ParamTrgt(param->owner(), 0), step);
msg->set_type(kPut);
dealer_->Send(&msg);
+// LOG(ERROR) << "worker msg " << msg;
return 1;
}
[2/4] incubator-singa git commit: SINGA-156 Remove the dependency on
ZMQ for single process training
Posted by wa...@apache.org.
SINGA-156 Remove the dependency on ZMQ for single process training
Update compiler arguments. Zookeeper and zmq will be used when "-enable-dist" is employed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/65b8c8df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/65b8c8df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/65b8c8df
Branch: refs/heads/master
Commit: 65b8c8dfceca42d0e9bcda4300c6ac70054c4d48
Parents: d8dffdf
Author: xiezl <xi...@comp.nus.edu.sg>
Authored: Sun Apr 3 22:28:45 2016 +0800
Committer: xiezl <xi...@comp.nus.edu.sg>
Committed: Sun Apr 3 22:28:45 2016 +0800
----------------------------------------------------------------------
Makefile.am | 26 +++++++++---------
configure.ac | 61 ++++++++++++++++++++++---------------------
include/singa/comm/msg.h | 2 +-
3 files changed, 44 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/65b8c8df/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 95f7e81..f16d35a 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -195,10 +195,10 @@ libsinga_la_SOURCES += $(CUDNN_SRCS)
libsinga_la_CXXFLAGS += $(CUDNN_CFLAGS)
libsinga_la_LDFLAGS += $(CUDNN_LDFLAGS) $(CUDNN_LIBS)
endif
-if DZOOKEEPER
+if DDIST
libsinga_la_SOURCES += $(ZOOKEEPER_SRCS)
-libsinga_la_CXXFLAGS += $(ZOOKEEPER_CFLAGS)
-libsinga_la_LDFLAGS += $(ZOOKEEPER_LDFLAGS) $(ZOOKEEPER_LIBS)
+libsinga_la_CXXFLAGS += $(DIST_CFLAGS)
+libsinga_la_LDFLAGS += $(DIST_LDFLAGS) $(DIST_LIBS)
endif
if DHDFS
@@ -214,7 +214,6 @@ singa_LDFLAGS = -lsinga \
-lglog \
-lprotobuf \
-lopenblas \
- -lzmq \
-lczmq
if LMDB
singa_LDFLAGS += -llmdb
@@ -226,10 +225,10 @@ singa_CXXFLAGS += $(CUDA_CFLAGS)
singa_LDFLAGS += $(CUDA_LDFLAGS) $(CUDA_LIBS)
endif
-if DZOOKEEPER
+if DDIST
singa_SOURCES += $(ZOOKEEPER_SRCS)
-singa_CXXFLAGS += $(ZOOKEEPER_CFLAGS)
-singa_LDFLAGS += $(ZOOKEEPER_LDFLAGS) $(ZOOKEEPER_LIBS)
+singa_CXXFLAGS += $(DIST_CFLAGS)
+singa_LDFLAGS += $(DIST_LDFLAGS) $(DIST_LIBS)
endif
if DCUDNN
@@ -251,10 +250,10 @@ singatool_LDFLAGS = -lsinga \
-lglog \
-lprotobuf
-if DZOOKEEPER
+if DDIST
singatool_SOURCES += $(ZOOKEEPER_SRCS)
-singatool_CXXFLAGS += $(ZOOKEEPER_CFLAGS)
-singatool_LDFLAGS += $(ZOOKEEPER_LDFLAGS) $(ZOOKEEPER_LIBS)
+singatool_CXXFLAGS += $(DIST_CFLAGS)
+singatool_LDFLAGS += $(DIST_LDFLAGS) $(DIST_LIBS)
endif
#lib_LTLIBRARIES += libgtest.la
@@ -274,7 +273,6 @@ singatest_LDFLAGS = -lsinga \
-lglog \
-lprotobuf \
-lopenblas \
- -lzmq \
-lczmq \
-lgtest
if LMDB
@@ -293,10 +291,10 @@ singatest_CXXFLAGS += $(CUDNN_CFLAGS)
singatest_LDFLAGS += $(CUDNN_LDFLAGS) $(CUDNN_LIBS)
endif
-if DZOOKEEPER
+if DDIST
singatest_SOURCES += $(ZOOKEEPER_SRCS)
-singatest_CXXFLAGS += $(ZOOKEEPER_CFLAGS)
-singatest_LDFLAGS += $(ZOOKEEPER_LDFLAGS) $(ZOOKEEPER_LIBS)
+singatest_CXXFLAGS += $(DIST_CFLAGS)
+singatest_LDFLAGS += $(DIST_LDFLAGS) $(DIST_LIBS)
endif
_driver_la_SOURCES = $(PY_SRCS)
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/65b8c8df/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index af80c1e..2de8720 100644
--- a/configure.ac
+++ b/configure.ac
@@ -40,9 +40,6 @@ AC_PROG_LIBTOOL
AC_SEARCH_LIBS([cblas_sgemm], [openblas], [], [
AC_MSG_ERROR([unable to find cblas_sgemm() function])
])
-AC_SEARCH_LIBS([zmq_ctx_new], [zmq], [], [
- AC_MSG_ERROR([unable to find zmq_ctx_new() function])
- ])
AC_SEARCH_LIBS([zmsg_new], [czmq], [], [
AC_MSG_ERROR([unable to find zmsg_new() function])
])
@@ -116,7 +113,7 @@ AC_SUBST(CUDA_CFLAGS)
# Setup custom CUDNN paths
AC_ARG_ENABLE([cudnn],
[AS_HELP_STRING(--enable-cudnn,enable CUDNN support)],
- [enable_cudnn=yes], [enable_cudnn=no])
+ [enable_cudnn="yes"], [enable_cudnn="no"])
AM_CONDITIONAL(DCUDNN, [test "$enable_cudnn" = "yes"])
AC_ARG_WITH([cudnn],
[AS_HELP_STRING([--with-cudnn=PATH], [prefix where CUDNN is installed])],
@@ -147,37 +144,41 @@ AC_SUBST(CUDNN_LDFLAGS)
AC_SUBST(CUDNN_LIBS)
# Setup custom zookeeper paths
-AC_ARG_ENABLE(zookeeper,
- AS_HELP_STRING([--enable-zookeeper],[enable zookeeper support]),
- [enable_zookeeper=yes],[enable_zookeeper=no])
-AM_CONDITIONAL(DZOOKEEPER, test "$enable_zookeeper" = yes)
-AC_ARG_WITH([zookeeper],
- [AS_HELP_STRING([--with-zookeeper=PATH], [prefix where zookeeper is installed])],
- [zookeeper_prefix=$withval], [zookeeper_prefix="/usr/local"])
-if test "$zookeeper_prefix" == "yes"; then
+AC_ARG_ENABLE(dist,
+ AS_HELP_STRING([--enable-dist],[enable dist support]),
+ [enable_dist="yes"],[enable_dist="no"])
+AM_CONDITIONAL(DDIST, test "$enable_dist" = "yes")
+AC_ARG_WITH([dist],
+ [AS_HELP_STRING([--with-dist=PATH], [prefix where dist libraries,i.e.
+ zookeeper/zmq is installed])],
+ [dist_prefix=$withval], [dist_prefix="/usr/local"])
+if test "$dist_prefix" == "yes"; then
if test "$withval" == "yes"; then
- cudnn_prefix="/usr/local"
+ dist_prefix="/usr/local"
fi
fi
-if test x"$enable_zookeeper" != x"no"; then
- ZOOKEEPER_CFLAGS="-I$zookeeper_prefix/include"
- ZOOKEEPER_LDFLAGS="-L$zookeeper_prefix/lib"
- ZOOKEEPER_LIBS="-lzookeeper_mt"
- LIBS="$LIBS $ZOOKEEPER_LIBS"
- LDFLAGS="$LDFLAGS $ZOOKEEPER_LDFLAGS"
- DEBUG+=" -DUSE_ZOOKEEPER"
- AC_DEFINE(DZOOKEEPER,[1],[Defined if zookeeper should be used])
- AC_CHECK_LIB([zookeeper_mt], [main], [], [
- AC_MSG_ERROR([unable to find zookeeper library])
- ])
+if test x"$enable_dist" == x"yes"; then
+ AC_CHECK_LIB([zookeeper_mt], [main], [], [
+ AC_MSG_ERROR([unable to find zookeeper library])
+ ])
+ AC_SEARCH_LIBS([zmq_ctx_new], [zmq], [], [
+ AC_MSG_ERROR([unable to find zmq_ctx_new() function])
+ ])
+ DIST_CFLAGS="-I$dist_prefix/include"
+ DIST_LDFLAGS="-L$dist_prefix/lib"
+ DIST_LIBS="-lzookeeper_mt -lzmq"
+ LIBS="$LIBS $DIST_LIBS"
+ LDFLAGS="$LDFLAGS $DIST_LDFLAGS"
+ DEBUG+=" -DUSE_ZOOKEEPER -DUSE_ZMQ"
+ AC_DEFINE(DDIST,[1],[Defined if dist should be used])
else
- ZOOKEEPER_CFLAGS=""
- ZOOKEEPER_LDFLAGS=""
- ZOOKEEPER_LIBS=""
+ DIST_CFLAGS=""
+ DIST_LDFLAGS=""
+ DIST_LIBS=""
fi
-AC_SUBST(ZOOKEEPER_CFLAGS)
-AC_SUBST(ZOOKEEPER_LDFLAGS)
-AC_SUBST(ZOOKEEPER_LIBS)
+AC_SUBST(DIST_CFLAGS)
+AC_SUBST(DIST_LDFLAGS)
+AC_SUBST(DIST_LIBS)
# Setup custom lmdb paths
AC_ARG_ENABLE(lmdb,
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/65b8c8df/include/singa/comm/msg.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/msg.h b/include/singa/comm/msg.h
index ade7fc8..8e03cd5 100644
--- a/include/singa/comm/msg.h
+++ b/include/singa/comm/msg.h
@@ -25,7 +25,7 @@
#include <utility>
// TODO(wangwei): make it a compiler argument
-#define USE_ZMQ
+// #define USE_ZMQ
#include <vector>
#ifdef USE_ZMQ
[4/4] incubator-singa git commit: SINGA-156 Remove the dependency on
ZMQ for single process training
Posted by wa...@apache.org.
SINGA-156 Remove the dependency on ZMQ for single process training
bug fixing in communication part
check cpplint
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/914c1e72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/914c1e72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/914c1e72
Branch: refs/heads/master
Commit: 914c1e722d3c5e81c2f2bb4b1ffbd14a63f4aa3a
Parents: 42f5253
Author: WANG Sheng <wa...@gmail.com>
Authored: Mon Apr 4 13:53:06 2016 +0800
Committer: WANG Sheng <wa...@gmail.com>
Committed: Mon Apr 4 16:54:19 2016 +0800
----------------------------------------------------------------------
include/singa/comm/socket.h | 18 ++--
include/singa/utils/safe_queue.h | 141 +++++++++++++++++++-------------
src/comm/msg.cc | 5 +-
src/comm/socket.cc | 4 +-
src/stub.cc | 3 +-
src/test/test_connection_layers.cc | 12 ++-
6 files changed, 107 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h
index 3194d8c..40d4cc3 100644
--- a/include/singa/comm/socket.h
+++ b/include/singa/comm/socket.h
@@ -43,17 +43,17 @@ class Dealer {
/**
* @param id used for identifying the msg queue of this dealer.
*/
- Dealer(int id);
+ explicit Dealer(int id);
~Dealer();
/**
- * Setup the connection with the remote router.
- *
- * For local router, there is no need to connect it.
- *
- * @param endpoint Identifier of the remote router to connect. It follows
- * ZeroMQ's format, i.e., IP:port, where IP is the connected process.
- * @return 1 connection sets up successfully; 0 otherwise
- */
+ * Setup the connection with the remote router.
+ *
+ * For local router, there is no need to connect it.
+ *
+ * @param endpoint Identifier of the remote router to connect. It follows
+ * ZeroMQ's format, i.e., IP:port, where IP is the connected process.
+ * @return 1 connection sets up successfully; 0 otherwise
+ */
int Connect(const std::string& endpoint);
/**
* Send a message to the local router (id=-1) or remote outer. It is
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/utils/safe_queue.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/safe_queue.h b/include/singa/utils/safe_queue.h
index 99adbf0..31df1ef 100644
--- a/include/singa/utils/safe_queue.h
+++ b/include/singa/utils/safe_queue.h
@@ -1,7 +1,35 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+/**
+ * The code is adapted from following source:
+ * http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
+ * under Creative Commons Attribution 4.0 International Public License
+ */
+
#ifndef SINGA_UTILS_SAFE_QUEUE_H_
#define SINGA_UTILS_SAFE_QUEUE_H_
// source: http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
+#include <algorithm>
#include <queue>
#include <list>
#include <mutex>
@@ -12,33 +40,31 @@
/** A thread-safe asynchronous queue */
template <class T, class Container = std::list<T>>
class SafeQueue {
-
typedef typename Container::value_type value_type;
typedef typename Container::size_type size_type;
typedef Container container_type;
public:
-
/*! Create safe queue. */
SafeQueue() = default;
- SafeQueue (SafeQueue&& sq) {
- m_queue = std::move (sq.m_queue);
+ SafeQueue(SafeQueue&& sq) {
+ m_queue = std::move(sq.m_queue);
}
- SafeQueue (const SafeQueue& sq) {
- std::lock_guard<std::mutex> lock (sq.m_mutex);
+ SafeQueue(const SafeQueue& sq) {
+ std::lock_guard<std::mutex> lock(sq.m_mutex);
m_queue = sq.m_queue;
}
/*! Destroy safe queue. */
~SafeQueue() {
- std::lock_guard<std::mutex> lock (m_mutex);
+ std::lock_guard<std::mutex> lock(m_mutex);
}
/**
* Sets the maximum number of items in the queue. Defaults is 0: No limit
* \param[in] item An item.
*/
- void set_max_num_items (unsigned int max_num_items) {
+ void set_max_num_items(unsigned int max_num_items) {
m_max_num_items = max_num_items;
}
@@ -47,13 +73,13 @@ class SafeQueue {
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
- bool push (const value_type& item) {
- std::lock_guard<std::mutex> lock (m_mutex);
+ bool push(const value_type& item) {
+ std::lock_guard<std::mutex> lock(m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
return false;
- m_queue.push (item);
+ m_queue.push(item);
m_condition.notify_one();
return true;
}
@@ -63,13 +89,13 @@ class SafeQueue {
* \param[in] item An item.
* \return true if an item was pushed into the queue
*/
- bool push (const value_type&& item) {
- std::lock_guard<std::mutex> lock (m_mutex);
+ bool push(const value_type&& item) {
+ std::lock_guard<std::mutex> lock(m_mutex);
if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
return false;
- m_queue.push (item);
+ m_queue.push(item);
m_condition.notify_one();
return true;
}
@@ -78,12 +104,11 @@ class SafeQueue {
* Pops item from the queue. If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
- void pop (value_type& item) {
- std::unique_lock<std::mutex> lock (m_mutex);
- m_condition.wait (lock, [this]() // Lambda funct
- {
+ void pop(value_type& item) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_condition.wait(lock, [this]() { // Lambda funct
return !m_queue.empty();
- });
+ });
item = m_queue.front();
m_queue.pop();
}
@@ -94,13 +119,12 @@ class SafeQueue {
* If queue is empty, this function blocks until item becomes available.
* \param[out] item The item.
*/
- void move_pop (value_type& item) {
- std::unique_lock<std::mutex> lock (m_mutex);
- m_condition.wait (lock, [this]() // Lambda funct
- {
+ void move_pop(value_type& item) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_condition.wait(lock, [this]() { // Lambda funct
return !m_queue.empty();
- });
- item = std::move (m_queue.front());
+ });
+ item = std::move(m_queue.front());
m_queue.pop();
}
@@ -109,8 +133,8 @@ class SafeQueue {
* \param[out] item The item.
* \return False is returned if no item is available.
*/
- bool try_pop (value_type& item) {
- std::unique_lock<std::mutex> lock (m_mutex);
+ bool try_pop(value_type& item) {
+ std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty())
return false;
@@ -126,13 +150,13 @@ class SafeQueue {
* \param[out] item The item.
* \return False is returned if no item is available.
*/
- bool try_move_pop (value_type& item) {
- std::unique_lock<std::mutex> lock (m_mutex);
+ bool try_move_pop(value_type& item) {
+ std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty())
return false;
- item = std::move (m_queue.front());
+ item = std::move(m_queue.front());
m_queue.pop();
return true;
}
@@ -143,15 +167,15 @@ class SafeQueue {
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
- bool timeout_pop (value_type& item, std::uint64_t timeout) {
- std::unique_lock<std::mutex> lock (m_mutex);
+ bool timeout_pop(value_type& item, std::uint64_t timeout) {
+ std::unique_lock<std::mutex> lock(m_mutex);
- if (m_queue.empty())
- {
+ if (m_queue.empty()) {
if (timeout == 0)
return false;
- if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+ if (m_condition.wait_for(lock, std::chrono::microseconds(timeout))
+ == std::cv_status::timeout)
return false;
}
@@ -168,19 +192,19 @@ class SafeQueue {
* \param[in] timeout The number of microseconds to wait.
* \return true if get an item from the queue, false if no item is received before the timeout.
*/
- bool timeout_move_pop (value_type& item, std::uint64_t timeout) {
- std::unique_lock<std::mutex> lock (m_mutex);
+ bool timeout_move_pop(value_type& item, std::uint64_t timeout) {
+ std::unique_lock<std::mutex> lock(m_mutex);
- if (m_queue.empty())
- {
+ if (m_queue.empty()) {
if (timeout == 0)
return false;
- if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+ if (m_condition.wait_for(lock, std::chrono::microseconds(timeout))
+ == std::cv_status::timeout)
return false;
}
- item = std::move (m_queue.front());
+ item = std::move(m_queue.front());
m_queue.pop();
return true;
}
@@ -190,7 +214,7 @@ class SafeQueue {
* \return Number of items in the queue.
*/
size_type size() const {
- std::lock_guard<std::mutex> lock (m_mutex);
+ std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.size();
}
@@ -199,7 +223,7 @@ class SafeQueue {
* \return true if queue is empty.
*/
bool empty() const {
- std::lock_guard<std::mutex> lock (m_mutex);
+ std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.empty();
}
@@ -207,11 +231,11 @@ class SafeQueue {
* Swaps the contents.
* \param[out] sq The SafeQueue to swap with 'this'.
*/
- void swap (SafeQueue& sq) {
+ void swap(SafeQueue& sq) {
if (this != &sq) {
- std::lock_guard<std::mutex> lock1 (m_mutex);
- std::lock_guard<std::mutex> lock2 (sq.m_mutex);
- m_queue.swap (sq.m_queue);
+ std::lock_guard<std::mutex> lock1(m_mutex);
+ std::lock_guard<std::mutex> lock2(sq.m_mutex);
+ m_queue.swap(sq.m_queue);
if (!m_queue.empty())
m_condition.notify_all();
@@ -224,10 +248,10 @@ class SafeQueue {
/*! The copy assignment operator */
SafeQueue& operator= (const SafeQueue& sq) {
if (this != &sq) {
- std::lock_guard<std::mutex> lock1 (m_mutex);
- std::lock_guard<std::mutex> lock2 (sq.m_mutex);
- std::queue<T, Container> temp {sq.m_queue};
- m_queue.swap (temp);
+ std::lock_guard<std::mutex> lock1(m_mutex);
+ std::lock_guard<std::mutex> lock2(sq.m_mutex);
+ std::queue<T, Container> temp{sq.m_queue};
+ m_queue.swap(temp);
if (!m_queue.empty())
m_condition.notify_all();
@@ -238,17 +262,15 @@ class SafeQueue {
/*! The move assignment operator */
SafeQueue& operator= (SafeQueue && sq) {
- std::lock_guard<std::mutex> lock (m_mutex);
- m_queue = std::move (sq.m_queue);
+ std::lock_guard<std::mutex> lock(m_mutex);
+ m_queue = std::move(sq.m_queue);
- if (!m_queue.empty()) m_condition.notify_all();
+ if (!m_queue.empty()) m_condition.notify_all();
return *this;
}
-
private:
-
std::queue<T, Container> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_condition;
@@ -257,7 +279,8 @@ class SafeQueue {
/*! Swaps the contents of two SafeQueue objects. */
template <class T, class Container>
-void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) {
- q1.swap (q2);
+void swap(SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) {
+ q1.swap(q2);
}
-#endif // SINGA_UTILS_SAFE_QUEUE_H_
+
+#endif // SINGA_UTILS_SAFE_QUEUE_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/msg.cc
----------------------------------------------------------------------
diff --git a/src/comm/msg.cc b/src/comm/msg.cc
index 94f3074..8128b46 100644
--- a/src/comm/msg.cc
+++ b/src/comm/msg.cc
@@ -128,7 +128,10 @@ int Msg::FrameSize() {
}
char* Msg::FrameStr() {
- return static_cast<char*>(frames_.at(idx_).first);
+ char* ret = new char[frames_.at(idx_).second];
+ memcpy(ret, static_cast<char*>(frames_.at(idx_).first),
+ frames_.at(idx_).second);
+ return ret;
}
void* Msg::FrameData() {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/socket.cc
----------------------------------------------------------------------
diff --git a/src/comm/socket.cc b/src/comm/socket.cc
index aa1ee85..9afc54c 100644
--- a/src/comm/socket.cc
+++ b/src/comm/socket.cc
@@ -23,7 +23,7 @@
#include <glog/logging.h>
namespace singa {
-const int TIME_OUT = 2; // max blocking time in milliseconds.
+const int TIME_OUT = 2; // max blocking time in milliseconds.
std::unordered_map<int, SafeQueue<Msg*>> msgQueues;
Dealer::~Dealer() {
#ifdef USE_ZMQ
@@ -68,7 +68,7 @@ int Dealer::Send(Msg** msg) {
Msg* Dealer::Receive(int timeout) {
Msg* msg = nullptr;
if (timeout > 0) {
- if(!msgQueues.at(id_).timeout_pop(msg, timeout))
+ if (!msgQueues.at(id_).timeout_pop(msg, timeout))
return nullptr;
} else {
msgQueues.at(id_).pop(msg);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/stub.cc
----------------------------------------------------------------------
diff --git a/src/stub.cc b/src/stub.cc
index c7658fc..4bc8c3d 100644
--- a/src/stub.cc
+++ b/src/stub.cc
@@ -48,8 +48,9 @@ void Stub::Setup() {
const string hostip = cluster->hostip();
int port = router_->Bind("tcp://" + hostip + ":*");
endpoint_ = hostip + ":" + std::to_string(port);
- } else
+ } else {
endpoint_ = "localhost";
+ }
}
/**
* Get a hash id for a Param object from a group.
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/test/test_connection_layers.cc
----------------------------------------------------------------------
diff --git a/src/test/test_connection_layers.cc b/src/test/test_connection_layers.cc
index 6529840..cd7f5f5 100644
--- a/src/test/test_connection_layers.cc
+++ b/src/test/test_connection_layers.cc
@@ -118,11 +118,15 @@ TEST(ConnectionLayerTest, BridgeTest) {
ASSERT_EQ(dst.data(nullptr).shape(0), N);
ASSERT_EQ(dst.data(nullptr).shape(1), M);
+ msgQueues[-1];
+ msgQueues[Addr(0, 0, kWorkerLayer)];
+
// bind bridges to socket
- Router router(N);
- router.Bind("inproc://router");
- Dealer dealer(0);
- dealer.Connect("inproc://router");
+ // Router router(N);
+ Router router;
+ // router.Bind("inproc://router");
+ Dealer dealer(Addr(0, 0, kWorkerLayer));
+ // dealer.Connect("inproc://router");
std::unordered_map<std::string, Layer*> name2bridge;
name2bridge[src.name()] = &src;
name2bridge[dst.name()] = &dst;
[3/4] incubator-singa git commit: SINGA-156 Remove the dependency on
ZMQ for single process training
Posted by wa...@apache.org.
SINGA-156 Remove the dependency on ZMQ for single process training
Move msg queue init into dealer and router.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/42f5253e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/42f5253e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/42f5253e
Branch: refs/heads/master
Commit: 42f5253eacde9a0ab87d3b4ed2382a137d9652d6
Parents: 65b8c8d
Author: Wei Wang <wa...@comp.nus.edu.sg>
Authored: Mon Apr 4 16:52:51 2016 +0800
Committer: Wei Wang <wa...@comp.nus.edu.sg>
Committed: Mon Apr 4 16:52:51 2016 +0800
----------------------------------------------------------------------
include/singa/comm/socket.h | 3 ++-
include/singa/server.h | 2 ++
src/comm/socket.cc | 8 ++++++++
src/driver.cc | 12 ------------
src/server.cc | 12 ++++++------
src/worker.cc | 2 +-
6 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/include/singa/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h
index de8cbde..3194d8c 100644
--- a/include/singa/comm/socket.h
+++ b/include/singa/comm/socket.h
@@ -43,7 +43,7 @@ class Dealer {
/**
* @param id used for identifying the msg queue of this dealer.
*/
- Dealer(int id) : id_(id) {}
+ Dealer(int id);
~Dealer();
/**
* Setup the connection with the remote router.
@@ -83,6 +83,7 @@ class Dealer {
class Router {
public:
~Router();
+ Router();
/**
* Bind the router to an endpoint for recv msg from remote dealer.
* If the router is used for intra-communication only, then no need to call
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/include/singa/server.h
----------------------------------------------------------------------
diff --git a/include/singa/server.h b/include/singa/server.h
index 4bffeae..d95862d 100644
--- a/include/singa/server.h
+++ b/include/singa/server.h
@@ -126,6 +126,8 @@ class Server {
std::vector<int> n_pending_sync_;
std::vector<Blob<float>> last_sync_;
std::unordered_map<int, std::vector<Msg*>> buffer_requests_;
+
+ Dealer* dealer_;
};
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/comm/socket.cc
----------------------------------------------------------------------
diff --git a/src/comm/socket.cc b/src/comm/socket.cc
index 8245398..aa1ee85 100644
--- a/src/comm/socket.cc
+++ b/src/comm/socket.cc
@@ -31,6 +31,10 @@ Dealer::~Dealer() {
#endif
}
+Dealer::Dealer(int id) : id_ (id) {
+ msgQueues[id];
+}
+
int Dealer::Connect(const std::string& endpoint) {
if (endpoint.length() > 0) {
#ifdef USE_ZMQ
@@ -79,6 +83,10 @@ Router::~Router() {
#endif
}
+Router::Router() {
+ msgQueues[-1];
+}
+
int Router::Bind(const std::string& endpoint) {
int port = -1;
if (endpoint.length() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 2952c62..b8f6735 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -232,18 +232,6 @@ void Driver::Train(const JobProto& job_conf) {
net->ToGraph(true).ToJson());
const vector<Worker*> workers = CreateWorkers(job_conf, net);
const vector<Server*> servers = CreateServers(job_conf, net);
- // Add msg queues for each socket
- for (auto worker : workers) {
- msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerParam)];
- msgQueues[Addr(worker->grp_id(), worker->id(), kWorkerLayer)];
-// LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerParam);
-// LOG(ERROR) << "worker addr " << Addr(worker->grp_id(), worker->id(), kWorkerLayer);
- }
- for (auto server : servers) {
- msgQueues[Addr(server->grp_id(), server->id(), kServer)];
-// LOG(ERROR) << "server addr " << Addr(server->grp_id(), server->id(), kServer);
- }
- msgQueues[-1];
vector<std::thread> threads;
for (auto server : servers)
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/server.cc
----------------------------------------------------------------------
diff --git a/src/server.cc b/src/server.cc
index d5ef028..3b72243 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -44,6 +44,7 @@ Server::Server(int group_id, int server_id,
updater_ = Updater::Create(job_conf.updater());
slice2group_ = slice2group;
slice2server_ = slice2server;
+ dealer_ = new Dealer(Addr(grp_id_, id_, kServer));
}
Server::~Server() {
@@ -52,6 +53,7 @@ Server::~Server() {
for (auto entry : shard_)
for (auto param : entry.second->shares)
delete param;
+ delete dealer_;
}
void Stop(void* running) {
@@ -73,11 +75,10 @@ void Server::Run() {
bool running = true;
CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
- auto dealer = new Dealer(Addr(grp_id_, id_, kServer));
// start recv loop and process requests
while (running) {
// cannot use blocking Receive() here, it will get stuck after workers stop.
- Msg* msg = dealer->Receive(cluster->poll_time());
+ Msg* msg = dealer_->Receive(cluster->poll_time());
if (msg == nullptr)
continue;
Msg* response = nullptr;
@@ -97,7 +98,7 @@ void Server::Run() {
break;
case kUpdate:
for (auto reply : HandleUpdate(&msg))
- dealer->Send(&reply);
+ dealer_->Send(&reply);
break;
case kSyncRequest:
response = HandleSyncRequest(&msg);
@@ -111,16 +112,15 @@ void Server::Run() {
}
}
if (response != nullptr)
- dealer->Send(&response);
+ dealer_->Send(&response);
}
// send stop msg to stub
Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
msg->set_type(kStop);
- dealer->Send(&msg);
+ dealer_->Send(&msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
- delete dealer;
}
Msg* Server::HandlePut(Msg **msg) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/42f5253e/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
index 6c461ce..5206513 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -53,7 +53,7 @@ void Worker::Setup(int grp_id, int id, const JobProto& conf,
train_net_ = train_net;
val_net_ = val_net;
test_net_ = test_net;
- bridge_dealer_ = dealer_ = nullptr;
+ InitSockets(train_net);
}
Worker::~Worker() {