You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by wa...@apache.org on 2016/08/10 05:30:40 UTC

[4/7] incubator-singa git commit: SINGA-233 Fix bugs in sending large messages

SINGA-233 Fix bugs in sending large messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d308e06d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d308e06d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d308e06d

Branch: refs/heads/dev
Commit: d308e06d6478203b82a0857efab9ce26e3375b09
Parents: 90a1cd5
Author: caiqc <ca...@comp.nus.edu.sg>
Authored: Fri Aug 5 00:03:24 2016 +0800
Committer: caiqc <ca...@comp.nus.edu.sg>
Committed: Wed Aug 10 09:42:46 2016 +0800

----------------------------------------------------------------------
 include/singa/io/network/endpoint.h |   5 +
 src/io/network/endpoint.cc          | 178 +++++++++++++++++++++----------
 test/CMakeLists.txt                 |  15 ++-
 test/singa/test_ep.cc               |   8 +-
 4 files changed, 142 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/include/singa/io/network/endpoint.h
----------------------------------------------------------------------
diff --git a/include/singa/io/network/endpoint.h b/include/singa/io/network/endpoint.h
index 1079fcc..063ca11 100644
--- a/include/singa/io/network/endpoint.h
+++ b/include/singa/io/network/endpoint.h
@@ -50,6 +50,9 @@ namespace singa {
 
 #define MAX_RETRY_CNT 3
 
+#define EV_WATCHER_STOP 0
+#define EV_WATCHER_START 1
+
 class NetworkThread;
 class EndPointFactory;
 
@@ -62,6 +65,7 @@ class EndPoint {
         std::mutex mtx_;
         struct sockaddr_in addr_;
         int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other
+        int pfd_ = -1;
         int conn_status_ = CONN_INIT;
         int pending_cnt_ = 0;
         int retry_cnt_ = 0;
@@ -113,6 +117,7 @@ class NetworkThread{
         void doWork();
         int asyncSend(int);
         void asyncSendPendingMsg(EndPoint*);
+        void afterConnEst(EndPoint* ep, int fd, bool active);
     public:
         EndPointFactory* epf_;
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/src/io/network/endpoint.cc
----------------------------------------------------------------------
diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc
index 7fe72b3..a74f5c2 100644
--- a/src/io/network/endpoint.cc
+++ b/src/io/network/endpoint.cc
@@ -283,17 +283,21 @@ void NetworkThread::onNewEp() {
                     ev_io_start(this->loop_, &this->fd_wwatcher_map_[fd]);
                 }
             } else {
+                afterConnEst(ep, fd, true);
+
                 // connection established immediately
-                LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd;
-                ep->conn_status_ = CONN_EST;
-                ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+                // LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd;
+                // ep->conn_status_ = CONN_EST;
 
-                // poll for new msgs
-                ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
-                ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+                // //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+                // ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
 
-                asyncSendPendingMsg(ep);
-                ep->cv_.notify_all();
+                // // poll for new msgs
+                // ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
+                // ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+
+                // asyncSendPendingMsg(ep);
+                // ep->cv_.notify_all();
             }
         }
     }
@@ -314,32 +318,21 @@ void NetworkThread::onConnEst(int fd) {
 
         handleConnLost(ep->fd_[0], ep);
 
-        switch(ep->conn_status_) {
-            case CONN_INIT:
-            case CONN_PENDING:
-                return;
-            default:
-                break;
-        }
+        if (ep->conn_status_ == CONN_EST && ep->conn_status_ == CONN_ERROR)
+                ep->cv_.notify_all();
 
     } else {
-        LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd;
-        ep->conn_status_ = CONN_EST;
-        // connect established; poll for new msgs
-        ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
 
-        ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
-        ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
-    }
+        afterConnEst(ep, fd, true);
 
-    if (ep->conn_status_ == CONN_EST && ep->to_ack_.size() > 0)
-        // if there are pending message, it means these msgs were sent over
-        // previous sockets that have been lost now
-        // we need to resend these msgs to the remote side
-        asyncSendPendingMsg(ep);
+        //ep->conn_status_ = CONN_EST;
+        //// connect established; poll for new msgs
+        //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]);
+        //ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
 
-    // Finally notify all waiting threads
-    ep->cv_.notify_all();
+        //ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
+        //ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]);
+    }
 }
 
 void NetworkThread::onNewConn() {
@@ -363,35 +356,93 @@ void NetworkThread::onNewConn() {
     ep = epf_->getOrCreateEp(a); 
     std::unique_lock<std::mutex> lock(ep->mtx_);
 
-    if (ep->fd_[1] >= 0) {
-        // the previous connection is lost
-        handleConnLost(ep->fd_[1], ep, false);
+    // Passive connection
+    afterConnEst(ep, fd, false);
+
+    // This should not be put before afterConnEst otherwise it may have no
+    // effect
+    fd_ip_map_[fd] = a;
+
+    // record the remote address
+    bcopy(&addr, &ep->addr_, len);
+}
+
+/**
+ * @brief The processing for a connected socket
+ *
+ * @param ep
+ * @param fd
+ * @param active indicate whethen this socket is locally initiated or not
+ */
+void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) {
+
+    if (active)
+        LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd;
+
+    int sfd;
+
+    if (active) {
+        ep->fd_[0] = fd;
+        sfd = ep->fd_[1];
+    } else {
+        if (ep->fd_[1] >= 0) {
+            // the previous connection is lost
+            handleConnLost(ep->fd_[1], ep, false);
+        }
+        ep->fd_[1] = fd;
+        sfd = ep->fd_[0];
     }
 
-    if (ep->fd_[0] == fd) {
-        // this fd is reused
+    if (sfd == fd)
+        // this fd is a reuse of a previous socket fd
+        // so we first need to clean the resouce for that fd
         handleConnLost(fd, ep, false);
-    }
 
-    fd_ip_map_[fd] = a;
+    // initialize io watchers and add the read watcher to the ev loop
     ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
     ev_io_start(loop_, &fd_rwatcher_map_[fd]);
 
-    // record the remote address
-    bcopy(&addr, &ep->addr_, len);
+    // stop watching the writable watcher if necessary
+    if (active)
+        ev_io_stop(loop_, &fd_wwatcher_map_[fd]);
+    ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
+
+    // see whether there is already a established connection for this fd
+    if (ep->conn_status_ == CONN_EST && sfd >= 0) {
+        // check if fd and sfd are associate with the same socket
+        struct sockaddr_in addr;
+        socklen_t len;
+        if (getsockname(fd, (struct sockaddr*)&addr, &len)) {
+            LOG(INFO) << "Unable to get local socket address: " << strerror(errno);
+            return;
+        }
 
-    ep->conn_status_ = CONN_EST;
-    ep->fd_[1] = fd;
+        // see whether the local address of fd is the same as the remote side
+        // of sfd, which has already been stored in ep->addr_
+        if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port == ep->addr_.sin_port) {
+            LOG(INFO) << fd << " and " << sfd << " are associated with the same socket";
+        } else {
+            // this socket is redundant, we close it maunally
+            close(fd);
+            handleConnLost(fd, ep);
+        }
+    } else {
+        ep->pfd_ = fd; // set the primary fd
+        ep->conn_status_ = CONN_EST;
+    }
 
-    if (ep->to_ack_.size() > 0)
-        // see if there are any messages waiting for ack
-        // if yes, resend them
-        asyncSendPendingMsg(ep);
+    if (fd == ep->pfd_) {
+        this->asyncSendPendingMsg(ep);
+    }
 
-    // this connection is initiaed by remote side, 
-    // so we dont need to notify the waiting thread
+    // Finally notify all waiting threads
+    // if this connection is initiaed by remote side, 
+    // we dont need to notify the waiting thread
     // later threads wanting to send to this ep, however,
     // are able to reuse this ep
+    if (active) {
+        ep->cv_.notify_all();
+    }
 }
 
 void NetworkThread::onSend(int fd) {
@@ -423,6 +474,9 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
 
     LOG(INFO) << "There are " << ep->send_.size() << " to-send msgs, and " << ep->to_ack_.size() << " to-ack msgs";
 
+    if (ep->to_ack_.empty())
+        return;
+
     while (!ep->send_.empty()) {
         ep->to_ack_.push(ep->send_.front());
         ep->send_.pop();
@@ -443,14 +497,16 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
  */
 int NetworkThread::asyncSend(int fd) {
 
-    if (fd_ip_map_.count(fd) == 0)
-        return 0;
-
     EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
 
     std::unique_lock<std::mutex> ep_lock(ep->mtx_);
 
+    if (fd != ep->pfd_)
+        // we only send over the primary fd
+        return 0;
+
     if (ep->conn_status_ != CONN_EST)
+        // This happens during reconnection
         goto out;
 
     while(!ep->send_.empty()) {
@@ -465,12 +521,10 @@ int NetworkThread::asyncSend(int fd) {
             else
                 nbytes = write(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_);
 
-            // LOG(INFO) << "Send " << nbytes << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
-
             if (nbytes == -1) {
                 if (errno == EWOULDBLOCK) {
-                    ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
-                    ev_io_start(loop_, &fd_wwatcher_map_[fd]);
+                    if (!ev_is_active(&fd_wwatcher_map_[fd]) && !ev_is_pending(&fd_wwatcher_map_[fd]))
+                        ev_io_start(loop_, &fd_wwatcher_map_[fd]);
                     goto out;
                 } else {
                     // this connection is lost; reset the send status
@@ -480,14 +534,24 @@ int NetworkThread::asyncSend(int fd) {
                 }
             } else 
                 msg.processed_ += nbytes;
+
+            //std::size_t m, p;
+            //uint8_t type; 
+            //uint32_t id;
+            //if (msg.msg_) {
+            //    readInteger(msg.msg_, type, id, m, p);
+            //    LOG(INFO) << "Send " << msg.processed_ << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd << " for the current DATA MSG " << msg.id_ << ", " << id << ", " << m << ", " << p;
+            //}
         }
 
         CHECK(msg.processed_ == msg.getSize());
 
         if (msg.type_ != MSG_ACK) {
+            LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << msg.getSize();;
             msg.processed_ = 0;
             ep->to_ack_.push(&msg);
         } else {
+            //LOG(INFO) << "Send an ACK message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_;
             delete &msg;
         }
 
@@ -497,7 +561,7 @@ int NetworkThread::asyncSend(int fd) {
         // if (ep->retry_cnt_ == 0) {
         //     LOG(INFO) << "Disconnect with Endpoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
         //     close(fd);
-        //     handleConnLost(fd, ep);
+        //     goto err;
         // }
     }
 out:
@@ -566,7 +630,8 @@ void NetworkThread::onRecv(int fd) {
 
             // got the whole metadata; 
             readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_);
-            // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_;
+
+            //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
         }
 
         // start reading the real data
@@ -589,6 +654,9 @@ void NetworkThread::onRecv(int fd) {
         }
 
         msg.processed_ += nread;
+
+        //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << ", processed_ = " << msg.processed_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
+        
         if (msg.processed_ == msg.getSize()) {
             LOG(INFO) << "Receive a " << msg.processed_ << " bytes DATA message from " << inet_ntoa(ep->addr_.sin_addr) << " with id " << msg.id_;
             ep->recv_.push(new Message(static_cast<Message&&>(msg)));

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 044d65a..3bfd36c 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -9,9 +9,14 @@ IF(NOT USE_OPENCL)
     LIST(REMOVE_ITEM singa_test_source "singa/test_opencl.cc")
 ENDIF()
 
-ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source})
-ADD_DEPENDENCIES(test_singa singa_core singa_utils)
-MESSAGE(STATUS "link libs" ${singa_linker_libs})
-TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model
+#ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source})
+#ADD_DEPENDENCIES(test_singa singa_core singa_utils)
+#MESSAGE(STATUS "link libs" ${singa_linker_libs})
+#TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model
+#    singa_io proto protobuf ${SINGA_LINKER_LIBS})
+#SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread")
+
+ADD_EXECUTABLE(test_ep "singa/test_ep.cc")
+ADD_DEPENDENCIES(test_ep singa_io)
+TARGET_LINK_LIBRARIES(test_ep singa_core singa_utils singa_model
     singa_io proto protobuf ${SINGA_LINKER_LIBS})
-SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread")

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/singa/test_ep.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc
index 2435f28..e6d3e07 100644
--- a/test/singa/test_ep.cc
+++ b/test/singa/test_ep.cc
@@ -6,13 +6,13 @@
 
 #include "singa/utils/logging.h"
 
-#define SIZE 100
+#define SIZE 10000000
 #define PORT 10000
 
 using namespace singa;
 int main(int argc, char** argv) {
-    char md[SIZE];
-    char payload[SIZE];
+    char* md = new char[SIZE];
+    char* payload = new char[SIZE];
 
     char* host = "localhost";
     int port = PORT;
@@ -45,7 +45,7 @@ int main(int argc, char** argv) {
 
     int cnt = 0;
 
-    while(ep && cnt++ <= 100 && ep->send(m) > 0 ) {
+    while(ep && cnt++ <= 5 && ep->send(m) > 0 ) {
 
         LOG(INFO) << "Send a " << m->getSize() << " bytes message";