You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by ma...@apache.org on 2016/09/14 22:25:45 UTC
incubator-mynewt-core git commit: mn_socket for native; TCP datapath.
Repository: incubator-mynewt-core
Updated Branches:
refs/heads/develop f7ecdb1fc -> 3204c603b
mn_socket for native; TCP datapath.
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/commit/3204c603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/tree/3204c603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/diff/3204c603
Branch: refs/heads/develop
Commit: 3204c603b5b7a003ca1c459d6d2419462d315367
Parents: f7ecdb1
Author: Marko Kiiskila <ma...@runtime.io>
Authored: Wed Sep 14 15:25:14 2016 -0700
Committer: Marko Kiiskila <ma...@runtime.io>
Committed: Wed Sep 14 15:25:14 2016 -0700
----------------------------------------------------------------------
sys/mn_socket/src/arch/sim/native_sock.c | 104 ++++++++++++++++++++----
sys/mn_socket/src/test/mn_sock_test.c | 112 ++++++++++++++++++++++++++
2 files changed, 199 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/blob/3204c603/sys/mn_socket/src/arch/sim/native_sock.c
----------------------------------------------------------------------
diff --git a/sys/mn_socket/src/arch/sim/native_sock.c b/sys/mn_socket/src/arch/sim/native_sock.c
index d21438e..f305201 100644
--- a/sys/mn_socket/src/arch/sim/native_sock.c
+++ b/sys/mn_socket/src/arch/sim/native_sock.c
@@ -87,10 +87,14 @@ static struct native_sock *
native_get_sock(void)
{
int i;
+ struct native_sock *ns;
for (i = 0; i < NATIVE_SOCK_MAX; i++) {
if (native_socks[i].ns_fd < 0) {
- return &native_socks[i];
+ ns = &native_socks[i];
+ ns->ns_poll = 0;
+ ns->ns_listen = 0;
+ return ns;
}
}
return NULL;
@@ -249,8 +253,6 @@ native_sock_create(struct mn_socket **sp, uint8_t domain,
idx = socket(domain, type, proto);
ns->ns_fd = idx;
ns->ns_type = type;
- ns->ns_poll = 0;
- ns->ns_listen = 0;
os_mutex_release(&nss->mtx);
if (idx < 0) {
return MN_ENOBUFS;
@@ -371,6 +373,53 @@ native_sock_listen(struct mn_socket *s, uint8_t qlen)
return 0;
}
+/*
+ * TX routine for stream sockets (TCP). The data to send is pointed
+ * by ns_tx.
+ * Keep sending mbufs until socket says that it can't take anymore.
+ * then wait for send event notification before continuing.
+ */
+static int
+native_sock_stream_tx(struct native_sock *ns, int notify)
+{
+ struct native_sock_state *nss = &native_sock_state;
+ struct os_mbuf *m;
+ struct os_mbuf *n;
+ int rc;
+
+ rc = 0;
+
+ os_mutex_pend(&nss->mtx, OS_TIMEOUT_NEVER);
+ while (ns->ns_tx && rc == 0) {
+ m = ns->ns_tx;
+ n = SLIST_NEXT(m, om_next);
+ rc = write(ns->ns_fd, m->om_data, m->om_len);
+ if (rc == m->om_len) {
+ ns->ns_tx = n;
+ os_mbuf_free(m);
+ rc = 0;
+ } else {
+ rc = errno;
+ }
+ }
+ os_mutex_release(&nss->mtx);
+ if (rc) {
+ if (rc == EAGAIN) {
+ rc = 0;
+ } else {
+ rc = native_sock_err_to_mn_err(rc);
+ }
+ }
+ if (notify) {
+ if (ns->ns_tx == NULL) {
+ mn_socket_writable(&ns->ns_sock, 0);
+ } else {
+ mn_socket_writable(&ns->ns_sock, rc);
+ }
+ }
+ return rc;
+}
+
static int
native_sock_sendto(struct mn_socket *s, struct os_mbuf *m,
struct mn_sockaddr *addr)
@@ -402,8 +451,15 @@ native_sock_sendto(struct mn_socket *s, struct os_mbuf *m,
}
os_mbuf_free_chain(m);
return 0;
+ } else {
+ if (ns->ns_tx) {
+ return MN_EAGAIN;
+ }
+ ns->ns_tx = m;
+
+ rc = native_sock_stream_tx(ns, 0);
+ return rc;
}
- return MN_EINVAL;
}
static int
@@ -418,23 +474,35 @@ native_sock_recvfrom(struct mn_socket *s, struct os_mbuf **mp,
socklen_t slen;
int rc;
+ slen = sizeof(ss);
if (ns->ns_type == SOCK_DGRAM) {
- slen = sizeof(ss);
rc = recvfrom(ns->ns_fd, tmpbuf, sizeof(tmpbuf), 0, sa, &slen);
- if (rc < 0) {
- return native_sock_err_to_mn_err(errno);
+ } else {
+ rc = getpeername(ns->ns_fd, sa, &slen);
+ if (rc == 0) {
+ rc = read(ns->ns_fd, tmpbuf, sizeof(tmpbuf));
}
+ }
+ if (rc < 0) {
+ return native_sock_err_to_mn_err(errno);
+ }
+ if (ns->ns_type == SOCK_STREAM && rc == 0) {
+ mn_socket_readable(&ns->ns_sock, MN_ECONNABORTED);
+ ns->ns_poll = 0;
+ native_sock_poll_rebuild(&native_sock_state);
+ return MN_ECONNABORTED;
+ }
- m = os_msys_get_pkthdr(rc, 0);
- if (!m) {
- return MN_ENOBUFS;
- }
- os_mbuf_copyinto(m, 0, tmpbuf, rc);
- *mp = m;
+ m = os_msys_get_pkthdr(rc, 0);
+ if (!m) {
+ return MN_ENOBUFS;
+ }
+ os_mbuf_copyinto(m, 0, tmpbuf, rc);
+ *mp = m;
+ if (addr) {
native_sock_addr_to_mn_addr(sa, addr);
- return 0;
}
- return MN_EINVAL;
+ return 0;
}
static int
@@ -472,7 +540,9 @@ socket_task(void *arg)
socklen_t slen;
int rc;
+ os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
while (1) {
+ os_mutex_release(&nss->mtx);
os_time_delay(NATIVE_SOCK_POLL_ITVL);
os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
if (nss->poll_fd_cnt) {
@@ -502,7 +572,6 @@ socket_task(void *arg)
if (new_ns->ns_fd < 0) {
continue;
}
- new_ns->ns_poll = 1;
new_ns->ns_type = ns->ns_type;
new_ns->ns_sock.ms_ops = &native_sock_ops;
os_mutex_release(&nss->mtx);
@@ -512,11 +581,12 @@ socket_task(void *arg)
*/
}
os_mutex_pend(&nss->mtx, OS_WAIT_FOREVER);
+ new_ns->ns_poll = 1;
+ native_sock_poll_rebuild(nss);
} else {
mn_socket_readable(&ns->ns_sock, 0);
}
}
- os_mutex_release(&nss->mtx);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-core/blob/3204c603/sys/mn_socket/src/test/mn_sock_test.c
----------------------------------------------------------------------
diff --git a/sys/mn_socket/src/test/mn_sock_test.c b/sys/mn_socket/src/test/mn_sock_test.c
index f8874e0..acb1dd7 100644
--- a/sys/mn_socket/src/test/mn_sock_test.c
+++ b/sys/mn_socket/src/test/mn_sock_test.c
@@ -326,12 +326,124 @@ sock_udp_data(void)
}
void
+std_writable(void *cb_arg, int err)
+{
+ int *i;
+
+ TEST_ASSERT(err == 0);
+ i = (int *)cb_arg;
+ if (i) {
+ *i = *i + 1;
+ }
+}
+
+void
+std_readable(void *cb_arg, int err)
+{
+ os_sem_release(&test_sem);
+}
+
+static union mn_socket_cb sud_sock_cbs = {
+ .socket.writable = std_writable,
+ .socket.readable = std_readable
+};
+
+int
+std_newconn(void *cb_arg, struct mn_socket *new)
+{
+ struct mn_socket **r_sock;
+
+ r_sock = cb_arg;
+ *r_sock = new;
+
+ mn_socket_set_cbs(new, NULL, &sud_sock_cbs);
+
+ os_sem_release(&test_sem);
+ return 0;
+}
+
+void
+sock_tcp_data(void)
+{
+ struct mn_socket *listen_sock;
+ struct mn_socket *sock;
+ struct mn_sockaddr_in msin;
+ int rc;
+ union mn_socket_cb listen_cbs = {
+ .listen.newconn = std_newconn,
+ };
+ int connected = 0;
+ struct mn_socket *new_sock = NULL;
+ struct os_mbuf *m;
+ char data[] = "1234567890";
+
+ rc = mn_socket(&listen_sock, MN_PF_INET, MN_SOCK_STREAM, 0);
+ TEST_ASSERT(rc == 0);
+
+ msin.msin_family = MN_PF_INET;
+ msin.msin_len = sizeof(msin);
+ msin.msin_port = htons(12447);
+
+ mn_inet_pton(MN_PF_INET, "127.0.0.1", &msin.msin_addr);
+
+ mn_socket_set_cbs(listen_sock, &new_sock, &listen_cbs);
+ rc = mn_bind(listen_sock, (struct mn_sockaddr *)&msin);
+ TEST_ASSERT(rc == 0);
+
+ rc = mn_listen(listen_sock, 2);
+ TEST_ASSERT(rc == 0);
+
+ rc = mn_socket(&sock, MN_PF_INET, MN_SOCK_STREAM, 0);
+ TEST_ASSERT(rc == 0);
+
+ mn_socket_set_cbs(sock, &connected, &sud_sock_cbs);
+
+ rc = mn_connect(sock, (struct mn_sockaddr *)&msin);
+ TEST_ASSERT(rc == 0);
+
+ rc = os_sem_pend(&test_sem, OS_TICKS_PER_SEC);
+ TEST_ASSERT(rc == 0);
+ TEST_ASSERT(connected == 1);
+ TEST_ASSERT(new_sock != NULL);
+
+ m = os_msys_get(sizeof(data), 0);
+ TEST_ASSERT(m);
+ rc = os_mbuf_copyinto(m, 0, data, sizeof(data));
+ TEST_ASSERT(rc == 0);
+ rc = mn_sendto(new_sock, (struct os_mbuf *)m, (struct mn_sockaddr *)&msin);
+ TEST_ASSERT(rc == 0);
+
+ /*
+ * Wait for the packet.
+ */
+ rc = os_sem_pend(&test_sem, OS_TICKS_PER_SEC);
+ TEST_ASSERT(rc == 0);
+
+ memset(&msin, 0, sizeof(msin));
+ rc = mn_recvfrom(sock, &m, (struct mn_sockaddr *)&msin);
+ TEST_ASSERT(rc == 0);
+ TEST_ASSERT(m != NULL);
+ TEST_ASSERT(msin.msin_family == MN_AF_INET);
+ TEST_ASSERT(msin.msin_len == sizeof(msin));
+ TEST_ASSERT(msin.msin_port != 0);
+ TEST_ASSERT(msin.msin_addr != 0);
+ os_mbuf_free_chain(m);
+
+ if (new_sock) {
+ mn_close(new_sock);
+ }
+ mn_close(sock);
+ mn_close(listen_sock);
+}
+
+void
mn_socket_test_handler(void *arg)
{
sock_open_close();
sock_listen();
sock_tcp_connect();
sock_udp_data();
+ sock_tcp_data();
os_test_restart();
}