You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/08/10 05:30:40 UTC
[4/7] incubator-singa git commit: SINGA-233 Fix bugs in sending large
messages
SINGA-233 Fix bugs in sending large messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d308e06d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d308e06d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d308e06d
Branch: refs/heads/dev
Commit: d308e06d6478203b82a0857efab9ce26e3375b09
Parents: 90a1cd5
Author: caiqc <ca...@comp.nus.edu.sg>
Authored: Fri Aug 5 00:03:24 2016 +0800
Committer: caiqc <ca...@comp.nus.edu.sg>
Committed: Wed Aug 10 09:42:46 2016 +0800
----------------------------------------------------------------------
include/singa/io/network/endpoint.h | 5 +
src/io/network/endpoint.cc | 178 +++++++++++++++++++++----------
test/CMakeLists.txt | 15 ++-
test/singa/test_ep.cc | 8 +-
4 files changed, 142 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/include/singa/io/network/endpoint.h
----------------------------------------------------------------------
diff --git a/include/singa/io/network/endpoint.h b/include/singa/io/network/endpoint.h
index 1079fcc..063ca11 100644
--- a/include/singa/io/network/endpoint.h
+++ b/include/singa/io/network/endpoint.h
@@ -50,6 +50,9 @@ namespace singa {
#define MAX_RETRY_CNT 3
+#define EV_WATCHER_STOP 0
+#define EV_WATCHER_START 1
+
class NetworkThread;
class EndPointFactory;
@@ -62,6 +65,7 @@ class EndPoint {
std::mutex mtx_;
struct sockaddr_in addr_;
int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other
+ int pfd_ = -1;
int conn_status_ = CONN_INIT;
int pending_cnt_ = 0;
int retry_cnt_ = 0;
@@ -113,6 +117,7 @@ class NetworkThread{
void doWork();
int asyncSend(int);
void asyncSendPendingMsg(EndPoint*);
+ void afterConnEst(EndPoint* ep, int fd, bool active);
public:
EndPointFactory* epf_;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/src/io/network/endpoint.cc
----------------------------------------------------------------------
diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc
index 7fe72b3..a74f5c2 100644
--- a/src/io/network/endpoint.cc
+++ b/src/io/network/endpoint.cc
@@ -283,17 +283,21 @@ void NetworkThread::onNewEp() {
ev_io_start(this->loop_, &this->fd_wwatcher_map_[fd]);
}
} else {
+ afterConnEst(ep, fd, true);
+
// connection established immediately
- LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd;
- ep->conn_status_ = CONN_EST;
- ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+ // LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd;
+ // ep->conn_status_ = CONN_EST;
- // poll for new msgs
- ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
- ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+ // //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+ // ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
- asyncSendPendingMsg(ep);
- ep->cv_.notify_all();
+ // // poll for new msgs
+ // ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
+ // ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+
+ // asyncSendPendingMsg(ep);
+ // ep->cv_.notify_all();
}
}
}
@@ -314,32 +318,21 @@ void NetworkThread::onConnEst(int fd) {
handleConnLost(ep->fd_[0], ep);
- switch(ep->conn_status_) {
- case CONN_INIT:
- case CONN_PENDING:
- return;
- default:
- break;
- }
+ if (ep->conn_status_ == CONN_EST && ep->conn_status_ == CONN_ERROR)
+ ep->cv_.notify_all();
} else {
- LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd;
- ep->conn_status_ = CONN_EST;
- // connect established; poll for new msgs
- ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
- ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
- ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
- }
+ afterConnEst(ep, fd, true);
- if (ep->conn_status_ == CONN_EST && ep->to_ack_.size() > 0)
- // if there are pending message, it means these msgs were sent over
- // previous sockets that have been lost now
- // we need to resend these msgs to the remote side
- asyncSendPendingMsg(ep);
+ //ep->conn_status_ = CONN_EST;
+ //// connect established; poll for new msgs
+ //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+ //ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
- // Finally notify all waiting threads
- ep->cv_.notify_all();
+ //ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
+ //ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+ }
}
void NetworkThread::onNewConn() {
@@ -363,35 +356,93 @@ void NetworkThread::onNewConn() {
ep = epf_->getOrCreateEp(a);
std::unique_lock<std::mutex> lock(ep->mtx_);
- if (ep->fd_[1] >= 0) {
- // the previous connection is lost
- handleConnLost(ep->fd_[1], ep, false);
+ // Passive connection
+ afterConnEst(ep, fd, false);
+
+ // This should not be put before afterConnEst otherwise it may have no
+ // effect
+ fd_ip_map_[fd] = a;
+
+ // record the remote address
+ bcopy(&addr, &ep->addr_, len);
+}
+
+/**
+ * @brief The processing for a connected socket
+ *
+ * @param ep
+ * @param fd
+ * @param active indicate whethen this socket is locally initiated or not
+ */
+void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) {
+
+ if (active)
+ LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd;
+
+ int sfd;
+
+ if (active) {
+ ep->fd_[0] = fd;
+ sfd = ep->fd_[1];
+ } else {
+ if (ep->fd_[1] >= 0) {
+ // the previous connection is lost
+ handleConnLost(ep->fd_[1], ep, false);
+ }
+ ep->fd_[1] = fd;
+ sfd = ep->fd_[0];
}
- if (ep->fd_[0] == fd) {
- // this fd is reused
+ if (sfd == fd)
+ // this fd is a reuse of a previous socket fd
+ // so we first need to clean the resouce for that fd
handleConnLost(fd, ep, false);
- }
- fd_ip_map_[fd] = a;
+ // initialize io watchers and add the read watcher to the ev loop
ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
ev_io_start(loop_, &fd_rwatcher_map_[fd]);
- // record the remote address
- bcopy(&addr, &ep->addr_, len);
+ // stop watching the writable watcher if necessary
+ if (active)
+ ev_io_stop(loop_, &fd_wwatcher_map_[fd]);
+ ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
+
+ // see whether there is already a established connection for this fd
+ if (ep->conn_status_ == CONN_EST && sfd >= 0) {
+ // check if fd and sfd are associate with the same socket
+ struct sockaddr_in addr;
+ socklen_t len;
+ if (getsockname(fd, (struct sockaddr*)&addr, &len)) {
+ LOG(INFO) << "Unable to get local socket address: " << strerror(errno);
+ return;
+ }
- ep->conn_status_ = CONN_EST;
- ep->fd_[1] = fd;
+ // see whether the local address of fd is the same as the remote side
+ // of sfd, which has already been stored in ep->addr_
+ if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port == ep->addr_.sin_port) {
+ LOG(INFO) << fd << " and " << sfd << " are associated with the same socket";
+ } else {
+ // this socket is redundant, we close it maunally
+ close(fd);
+ handleConnLost(fd, ep);
+ }
+ } else {
+ ep->pfd_ = fd; // set the primary fd
+ ep->conn_status_ = CONN_EST;
+ }
- if (ep->to_ack_.size() > 0)
- // see if there are any messages waiting for ack
- // if yes, resend them
- asyncSendPendingMsg(ep);
+ if (fd == ep->pfd_) {
+ this->asyncSendPendingMsg(ep);
+ }
- // this connection is initiaed by remote side,
- // so we dont need to notify the waiting thread
+ // Finally notify all waiting threads
+ // if this connection is initiaed by remote side,
+ // we dont need to notify the waiting thread
// later threads wanting to send to this ep, however,
// are able to reuse this ep
+ if (active) {
+ ep->cv_.notify_all();
+ }
}
void NetworkThread::onSend(int fd) {
@@ -423,6 +474,9 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
LOG(INFO) << "There are " << ep->send_.size() << " to-send msgs, and " << ep->to_ack_.size() << " to-ack msgs";
+ if (ep->to_ack_.empty())
+ return;
+
while (!ep->send_.empty()) {
ep->to_ack_.push(ep->send_.front());
ep->send_.pop();
@@ -443,14 +497,16 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
*/
int NetworkThread::asyncSend(int fd) {
- if (fd_ip_map_.count(fd) == 0)
- return 0;
-
EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
std::unique_lock<std::mutex> ep_lock(ep->mtx_);
+ if (fd != ep->pfd_)
+ // we only send over the primary fd
+ return 0;
+
if (ep->conn_status_ != CONN_EST)
+ // This happens during reconnection
goto out;
while(!ep->send_.empty()) {
@@ -465,12 +521,10 @@ int NetworkThread::asyncSend(int fd) {
else
nbytes = write(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_);
- // LOG(INFO) << "Send " << nbytes << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
-
if (nbytes == -1) {
if (errno == EWOULDBLOCK) {
- ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
- ev_io_start(loop_, &fd_wwatcher_map_[fd]);
+ if (!ev_is_active(&fd_wwatcher_map_[fd]) && !ev_is_pending(&fd_wwatcher_map_[fd]))
+ ev_io_start(loop_, &fd_wwatcher_map_[fd]);
goto out;
} else {
// this connection is lost; reset the send status
@@ -480,14 +534,24 @@ int NetworkThread::asyncSend(int fd) {
}
} else
msg.processed_ += nbytes;
+
+ //std::size_t m, p;
+ //uint8_t type;
+ //uint32_t id;
+ //if (msg.msg_) {
+ // readInteger(msg.msg_, type, id, m, p);
+ // LOG(INFO) << "Send " << msg.processed_ << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd << " for the current DATA MSG " << msg.id_ << ", " << id << ", " << m << ", " << p;
+ //}
}
CHECK(msg.processed_ == msg.getSize());
if (msg.type_ != MSG_ACK) {
+ LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << msg.getSize();;
msg.processed_ = 0;
ep->to_ack_.push(&msg);
} else {
+ //LOG(INFO) << "Send an ACK message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_;
delete &msg;
}
@@ -497,7 +561,7 @@ int NetworkThread::asyncSend(int fd) {
// if (ep->retry_cnt_ == 0) {
// LOG(INFO) << "Disconnect with Endpoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
// close(fd);
- // handleConnLost(fd, ep);
+ // goto err;
// }
}
out:
@@ -566,7 +630,8 @@ void NetworkThread::onRecv(int fd) {
// got the whole metadata;
readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_);
- // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_;
+
+ //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
}
// start reading the real data
@@ -589,6 +654,9 @@ void NetworkThread::onRecv(int fd) {
}
msg.processed_ += nread;
+
+ //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << ", processed_ = " << msg.processed_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
+
if (msg.processed_ == msg.getSize()) {
LOG(INFO) << "Receive a " << msg.processed_ << " bytes DATA message from " << inet_ntoa(ep->addr_.sin_addr) << " with id " << msg.id_;
ep->recv_.push(new Message(static_cast<Message&&>(msg)));
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 044d65a..3bfd36c 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -9,9 +9,14 @@ IF(NOT USE_OPENCL)
LIST(REMOVE_ITEM singa_test_source "singa/test_opencl.cc")
ENDIF()
-ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source})
-ADD_DEPENDENCIES(test_singa singa_core singa_utils)
-MESSAGE(STATUS "link libs" ${singa_linker_libs})
-TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model
+#ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source})
+#ADD_DEPENDENCIES(test_singa singa_core singa_utils)
+#MESSAGE(STATUS "link libs" ${singa_linker_libs})
+#TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model
+# singa_io proto protobuf ${SINGA_LINKER_LIBS})
+#SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread")
+
+ADD_EXECUTABLE(test_ep "singa/test_ep.cc")
+ADD_DEPENDENCIES(test_ep singa_io)
+TARGET_LINK_LIBRARIES(test_ep singa_core singa_utils singa_model
singa_io proto protobuf ${SINGA_LINKER_LIBS})
-SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread")
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/singa/test_ep.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc
index 2435f28..e6d3e07 100644
--- a/test/singa/test_ep.cc
+++ b/test/singa/test_ep.cc
@@ -6,13 +6,13 @@
#include "singa/utils/logging.h"
-#define SIZE 100
+#define SIZE 10000000
#define PORT 10000
using namespace singa;
int main(int argc, char** argv) {
- char md[SIZE];
- char payload[SIZE];
+ char* md = new char[SIZE];
+ char* payload = new char[SIZE];
char* host = "localhost";
int port = PORT;
@@ -45,7 +45,7 @@ int main(int argc, char** argv) {
int cnt = 0;
- while(ep && cnt++ <= 100 && ep->send(m) > 0 ) {
+ while(ep && cnt++ <= 5 && ep->send(m) > 0 ) {
LOG(INFO) << "Send a " << m->getSize() << " bytes message";