You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by ma...@apache.org on 2016/09/14 22:25:45 UTC

incubator-mynewt-core git commit: mn_socket for native; TCP datapath.

Repository: incubator-mynewt-core
Updated Branches:
  refs/heads/develop f7ecdb1fc -> 3204c603b


mn_socket for native; TCP datapath.


Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/commit/3204c603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/tree/3204c603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/diff/3204c603

Branch: refs/heads/develop
Commit: 3204c603b5b7a003ca1c459d6d2419462d315367
Parents: f7ecdb1
Author: Marko Kiiskila <ma...@runtime.io>
Authored: Wed Sep 14 15:25:14 2016 -0700
Committer: Marko Kiiskila <ma...@runtime.io>
Committed: Wed Sep 14 15:25:14 2016 -0700

----------------------------------------------------------------------
 sys/mn_socket/src/arch/sim/native_sock.c | 104 ++++++++++++++++++++----
 sys/mn_socket/src/test/mn_sock_test.c    | 112 ++++++++++++++++++++++++++
 2 files changed, 199 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/blob/3204c603/sys/mn_socket/src/arch/sim/native_sock.c
----------------------------------------------------------------------
diff --git a/sys/mn_socket/src/arch/sim/native_sock.c b/sys/mn_socket/src/arch/sim/native_sock.c
index d21438e..f305201 100644
--- a/sys/mn_socket/src/arch/sim/native_sock.c
+++ b/sys/mn_socket/src/arch/sim/native_sock.c
@@ -87,10 +87,14 @@ static struct native_sock *
 native_get_sock(void)
 {
     int i;
+    struct native_sock *ns;
 
     for (i = 0; i < NATIVE_SOCK_MAX; i++) {
         if (native_socks[i].ns_fd < 0) {
-            return &native_socks[i];
+            ns = &native_socks[i];
+            ns->ns_poll = 0;
+            ns->ns_listen = 0;
+            return ns;
         }
     }
     return NULL;
@@ -249,8 +253,6 @@ native_sock_create(struct mn_socket **sp, uint8_t domain,
     idx = socket(domain, type, proto);
     ns->ns_fd = idx;
     ns->ns_type = type;
-    ns->ns_poll = 0;
-    ns->ns_listen = 0;
     os_mutex_release(&nss->mtx);
     if (idx < 0) {
         return MN_ENOBUFS;
@@ -371,6 +373,53 @@ native_sock_listen(struct mn_socket *s, uint8_t qlen)
     return 0;
 }
 
+/*
+ * TX routine for stream sockets (TCP). The data to send is pointed
+ * by ns_tx.
+ * Keep sending mbufs until socket says that it can't take anymore.
+ * then wait for send event notification before continuing.
+ */
+static int
+native_sock_stream_tx(struct native_sock *ns, int notify)
+{
+    struct native_sock_state *nss = &native_sock_state;
+    struct os_mbuf *m;
+    struct os_mbuf *n;
+    int rc;
+
+    rc = 0;
+
+    os_mutex_pend(&nss->mtx, OS_TIMEOUT_NEVER);
+    while (ns->ns_tx && rc == 0) {
+        m = ns->ns_tx;
+        n = SLIST_NEXT(m, om_next);
+        rc = write(ns->ns_fd, m->om_data, m->om_len);
+        if (rc == m->om_len) {
+            ns->ns_tx = n;
+            os_mbuf_free(m);
+            rc = 0;
+        } else {
+            rc = errno;
+        }
+    }
+    os_mutex_release(&nss->mtx);
+    if (rc) {
+        if (rc == EAGAIN) {
+            rc = 0;
+        } else {
+            rc = native_sock_err_to_mn_err(rc);
+        }
+    }
+    if (notify) {
+        if (ns->ns_tx == NULL) {
+            mn_socket_writable(&ns->ns_sock, 0);
+        } else {
+            mn_socket_writable(&ns->ns_sock, rc);
+        }
+    }
+    return rc;
+}
+
 static int
 native_sock_sendto(struct mn_socket *s, struct os_mbuf *m,
   struct mn_sockaddr *addr)
@@ -402,8 +451,15 @@ native_sock_sendto(struct mn_socket *s, struct os_mbuf *m,
         }
         os_mbuf_free_chain(m);
         return 0;
+    } else {
+        if (ns->ns_tx) {
+            return MN_EAGAIN;
+        }
+        ns->ns_tx = m;
+
+        rc = native_sock_stream_tx(ns, 0);
+        return rc;
     }
-    return MN_EINVAL;
 }
 
 static int
@@ -418,23 +474,35 @@ native_sock_recvfrom(struct mn_socket *s, struct os_mbuf **mp,
     socklen_t slen;
     int rc;
 
+    slen = sizeof(ss);
     if (ns->ns_type == SOCK_DGRAM) {
-        slen = sizeof(ss);
         rc = recvfrom(ns->ns_fd, tmpbuf, sizeof(tmpbuf), 0, sa, &slen);
-        if (rc < 0) {
-            return native_sock_err_to_mn_err(errno);
+    } else {
+        rc = getpeername(ns->ns_fd, sa, &slen);
+        if (rc == 0) {
+            rc = read(ns->ns_fd, tmpbuf, sizeof(tmpbuf));
         }
+    }
+    if (rc < 0) {
+        return native_sock_err_to_mn_err(errno);
+    }
+    if (ns->ns_type == SOCK_STREAM && rc == 0) {
+        mn_socket_readable(&ns->ns_sock, MN_ECONNABORTED);
+        ns->ns_poll = 0;
+        native_sock_poll_rebuild(&native_sock_state);
+        return MN_ECONNABORTED;
+    }
 
-        m = os_msys_get_pkthdr(rc, 0);
-        if (!m) {
-            return MN_ENOBUFS;
-        }
-        os_mbuf_copyinto(m, 0, tmpbuf, rc);
-        *mp = m;
+    m = os_msys_get_pkthdr(rc, 0);
+    if (!m) {
+        return MN_ENOBUFS;
+    }
+    os_mbuf_copyinto(m, 0, tmpbuf, rc);
+    *mp = m;
+    if (addr) {
         native_sock_addr_to_mn_addr(sa, addr);
-        return 0;
     }
-    return MN_EINVAL;
+    return 0;
 }
 
 static int
@@ -472,7 +540,9 @@ socket_task(void *arg)
     socklen_t slen;
     int rc;
 
+    os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
     while (1) {
+        os_mutex_release(&nss->mtx);
         os_time_delay(NATIVE_SOCK_POLL_ITVL);
         os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
         if (nss->poll_fd_cnt) {
@@ -502,7 +572,6 @@ socket_task(void *arg)
                 if (new_ns->ns_fd < 0) {
                     continue;
                 }
-                new_ns->ns_poll = 1;
                 new_ns->ns_type = ns->ns_type;
                 new_ns->ns_sock.ms_ops = &native_sock_ops;
                 os_mutex_release(&nss->mtx);
@@ -512,11 +581,12 @@ socket_task(void *arg)
                      */
                 }
                 os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
+                new_ns->ns_poll = 1;
+                native_sock_poll_rebuild(nss);
             } else {
                 mn_socket_readable(&ns->ns_sock, 0);
             }
         }
-        os_mutex_release(&nss->mtx);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/blob/3204c603/sys/mn_socket/src/test/mn_sock_test.c
----------------------------------------------------------------------
diff --git a/sys/mn_socket/src/test/mn_sock_test.c b/sys/mn_socket/src/test/mn_sock_test.c
index f8874e0..acb1dd7 100644
--- a/sys/mn_socket/src/test/mn_sock_test.c
+++ b/sys/mn_socket/src/test/mn_sock_test.c
@@ -326,12 +326,124 @@ sock_udp_data(void)
 }
 
 void
+std_writable(void *cb_arg, int err)
+{
+    int *i;
+
+    TEST_ASSERT(err == 0);
+    i = (int *)cb_arg;
+    if (i) {
+        *i = *i + 1;
+    }
+}
+
+void
+std_readable(void *cb_arg, int err)
+{
+    os_sem_release(&test_sem);
+}
+
+static union mn_socket_cb sud_sock_cbs = {
+    .socket.writable = std_writable,
+    .socket.readable = std_readable
+};
+
+int
+std_newconn(void *cb_arg, struct mn_socket *new)
+{
+    struct mn_socket **r_sock;
+
+    r_sock = cb_arg;
+    *r_sock = new;
+
+    mn_socket_set_cbs(new, NULL, &sud_sock_cbs);
+
+    os_sem_release(&test_sem);
+    return 0;
+}
+
+void
+sock_tcp_data(void)
+{
+    struct mn_socket *listen_sock;
+    struct mn_socket *sock;
+    struct mn_sockaddr_in msin;
+    int rc;
+    union mn_socket_cb listen_cbs = {
+        .listen.newconn = std_newconn,
+    };
+    int connected = 0;
+    struct mn_socket *new_sock = NULL;
+    struct os_mbuf *m;
+    char data[] = "1234567890";
+
+    rc = mn_socket(&listen_sock, MN_PF_INET, MN_SOCK_STREAM, 0);
+    TEST_ASSERT(rc == 0);
+
+    msin.msin_family = MN_PF_INET;
+    msin.msin_len = sizeof(msin);
+    msin.msin_port = htons(12447);
+
+    mn_inet_pton(MN_PF_INET, "127.0.0.1", &msin.msin_addr);
+
+    mn_socket_set_cbs(listen_sock, &new_sock, &listen_cbs);
+    rc = mn_bind(listen_sock, (struct mn_sockaddr *)&msin);
+    TEST_ASSERT(rc == 0);
+
+    rc = mn_listen(listen_sock, 2);
+    TEST_ASSERT(rc == 0);
+
+    rc = mn_socket(&sock, MN_PF_INET, MN_SOCK_STREAM, 0);
+    TEST_ASSERT(rc == 0);
+
+    mn_socket_set_cbs(sock, &connected, &sud_sock_cbs);
+
+    rc = mn_connect(sock, (struct mn_sockaddr *)&msin);
+    TEST_ASSERT(rc == 0);
+
+    rc = os_sem_pend(&test_sem, OS_TICKS_PER_SEC);
+    TEST_ASSERT(rc == 0);
+    TEST_ASSERT(connected == 1);
+    TEST_ASSERT(new_sock != NULL);
+
+    m = os_msys_get(sizeof(data), 0);
+    TEST_ASSERT(m);
+    rc = os_mbuf_copyinto(m, 0, data, sizeof(data));
+    TEST_ASSERT(rc == 0);
+    rc = mn_sendto(new_sock, (struct os_mbuf *)m, (struct mn_sockaddr *)&msin);
+    TEST_ASSERT(rc == 0);
+
+    /*
+     * Wait for the packet.
+     */
+    rc = os_sem_pend(&test_sem, OS_TICKS_PER_SEC);
+    TEST_ASSERT(rc == 0);
+
+    memset(&msin, 0, sizeof(msin));
+    rc = mn_recvfrom(sock, &m, (struct mn_sockaddr *)&msin);
+    TEST_ASSERT(rc == 0);
+    TEST_ASSERT(m != NULL);
+    TEST_ASSERT(msin.msin_family == MN_AF_INET);
+    TEST_ASSERT(msin.msin_len == sizeof(msin));
+    TEST_ASSERT(msin.msin_port != 0);
+    TEST_ASSERT(msin.msin_addr != 0);
+    os_mbuf_free_chain(m);
+
+    if (new_sock) {
+        mn_close(new_sock);
+    }
+    mn_close(sock);
+    mn_close(listen_sock);
+}
+
+void
 mn_socket_test_handler(void *arg)
 {
     sock_open_close();
     sock_listen();
     sock_tcp_connect();
     sock_udp_data();
+    sock_tcp_data();
     os_test_restart();
 }