You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/01/04 18:03:17 UTC
[1/2] qpid-proton git commit: PROTON-1706: pn_listener_t provide
access to actual listening port
Repository: qpid-proton
Updated Branches:
refs/heads/master 223e6d012 -> 4dfe29692
PROTON-1706: pn_listener_t provide access to actual listening port
pn_netaddr_listening() provides list of pn_netaddr_t addresses for a listener.
Implemented in all proactors: epoll, libuv, iocp
Updated C tests to listen on port 0 for safe, portable dynamic port allocation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7bc25c61
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7bc25c61
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7bc25c61
Branch: refs/heads/master
Commit: 7bc25c6117350f55572c0b628c9f9fd29d68c7d9
Parents: 223e6d0
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jan 4 10:28:36 2018 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jan 4 12:51:24 2018 -0500
----------------------------------------------------------------------
examples/c/broker.c | 13 +-
examples/c/direct.c | 13 +-
examples/c/example_test.py | 59 ++++-----
proton-c/CMakeLists.txt | 1 +
proton-c/include/proton/netaddr.h | 33 ++++-
proton-c/src/proactor/epoll.c | 49 +++----
proton-c/src/proactor/libuv.c | 51 ++++----
proton-c/src/proactor/netaddr-internal.h | 96 ++++++++++++++
proton-c/src/proactor/proactor-internal.c | 15 ++-
proton-c/src/proactor/proactor-internal.h | 6 +-
proton-c/src/proactor/win_iocp.c | 92 +++-----------
proton-c/src/tests/fdlimit.py | 12 +-
proton-c/src/tests/proactor.c | 169 +++++++++++++------------
proton-c/src/tests/test_port.h | 142 ---------------------
proton-c/src/tests/test_tools.h | 2 +-
15 files changed, 341 insertions(+), 412 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index c2efed8..dee8526 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -21,6 +21,7 @@
#include <proton/engine.h>
#include <proton/listener.h>
+#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/sasl.h>
#include <proton/ssl.h>
@@ -285,11 +286,13 @@ static void handle(broker_t* b, pn_event_t* e) {
switch (pn_event_type(e)) {
- case PN_LISTENER_OPEN:
- printf("listening\n");
- fflush(stdout);
- break;
-
+ case PN_LISTENER_OPEN: {
+ char port[256]; /* Get the listening port */
+ pn_netaddr_host_port(pn_netaddr_listening(pn_event_listener(e)), NULL, 0, port, sizeof(port));
+ printf("listening on %s\n", port);
+ fflush(stdout);
+ break;
+ }
case PN_LISTENER_ACCEPT: {
/* Configure a transport to allow SSL and SASL connections. See ssl_domain setup in main() */
pn_transport_t *t = pn_transport();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 3313ab2..1943dcc 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -24,6 +24,7 @@
#include <proton/delivery.h>
#include <proton/link.h>
#include <proton/listener.h>
+#include <proton/netaddr.h>
#include <proton/message.h>
#include <proton/proactor.h>
#include <proton/sasl.h>
@@ -228,11 +229,13 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
- case PN_LISTENER_OPEN:
- printf("listening\n");
- fflush(stdout);
- break;
-
+ case PN_LISTENER_OPEN: {
+ char port[256]; /* Get the listening port */
+ pn_netaddr_host_port(pn_netaddr_listening(pn_event_listener(event)), NULL, 0, port, sizeof(port));
+ printf("listening on %s\n", port);
+ fflush(stdout);
+ break;
+ }
case PN_LISTENER_ACCEPT:
pn_listener_accept2(pn_event_listener(event), NULL, NULL);
break;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
index 1e25c76..b6a5a4a 100644
--- a/examples/c/example_test.py
+++ b/examples/c/example_test.py
@@ -35,18 +35,18 @@ def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect
def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n
def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n
+def wait_listening(proc):
+ m = proc.wait_re("listening on ([0-9]+)$")
+ return m.group(1), m.group(0)+"\n" # Return (port, line)
+
class Broker(object):
def __init__(self, test):
self.test = test
def __enter__(self):
- with TestPort() as tp:
- self.port = tp.port
- self.host = tp.host
- self.addr = tp.addr
- self.proc = self.test.proc(["broker", "", self.port])
- self.proc.wait_re("listening")
- return self
+ self.proc = self.test.proc(["broker", "", "0"])
+ self.port, _ = wait_listening(self.proc)
+ return self
def __exit__(self, *args):
b = getattr(self, "proc")
@@ -75,19 +75,17 @@ class CExampleTest(ProcTestCase):
def test_send_direct(self):
"""Send to direct server"""
- with TestPort() as tp:
- d = self.proc(["direct", "", tp.port])
- d.wait_re("listening")
- self.assertEqual(send_expect(), self.runex("send", tp.port))
- self.assertMultiLineEqual("listening\n"+receive_expect(), d.wait_exit())
+ d = self.proc(["direct", "", "0"])
+ port, line = wait_listening(d)
+ self.assertEqual(send_expect(), self.runex("send", port))
+ self.assertMultiLineEqual(line+receive_expect(), d.wait_exit())
def test_receive_direct(self):
"""Receive from direct server"""
- with TestPort() as tp:
- d = self.proc(["direct", "", tp.port])
- d.wait_re("listening")
- self.assertMultiLineEqual(receive_expect(), self.runex("receive", tp.port))
- self.assertEqual("listening\n10 messages sent and acknowledged\n", d.wait_exit())
+ d = self.proc(["direct", "", "0"])
+ port, line = wait_listening(d)
+ self.assertMultiLineEqual(receive_expect(), self.runex("receive", port))
+ self.assertEqual(line+"10 messages sent and acknowledged\n", d.wait_exit())
def test_send_abort_broker(self):
"""Sending aborted messages to a broker"""
@@ -101,20 +99,19 @@ class CExampleTest(ProcTestCase):
def test_send_abort_direct(self):
"""Send aborted messages to the direct server"""
- with TestPort() as tp:
- d = self.proc(["direct", "", tp.port, "examples", "20"])
- expect = "listening\n"
- d.wait_re(expect)
- self.assertEqual(send_expect(), self.runex("send", tp.port))
- expect += receive_expect_messages()
- d.wait_re(expect)
- self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port))
- expect += "Message aborted\n"*MESSAGES
- d.wait_re(expect)
- self.assertEqual(send_expect(), self.runex("send", tp.port))
- expect += receive_expect_messages()+receive_expect_total(20)
- self.maxDiff = None
- self.assertMultiLineEqual(expect, d.wait_exit())
+ d = self.proc(["direct", "", "0", "examples", "20"])
+ port, line = wait_listening(d)
+ expect = line
+ self.assertEqual(send_expect(), self.runex("send", port))
+ expect += receive_expect_messages()
+ d.wait_re(expect)
+ self.assertEqual(send_abort_expect(), self.runex("send-abort", port))
+ expect += "Message aborted\n"*MESSAGES
+ d.wait_re(expect)
+ self.assertEqual(send_expect(), self.runex("send", port))
+ expect += receive_expect_messages()+receive_expect_total(20)
+ self.maxDiff = None
+ self.assertMultiLineEqual(expect, d.wait_exit())
def test_send_ssl_receive(self):
"""Send first then receive"""
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 590b6a6..0fd5864 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -440,6 +440,7 @@ set (qpid-proton-private-includes
src/reactor/selectable.h
src/platform/platform.h
src/platform/platform_fmt.h
+ src/proactor/netaddr-internal.h
src/proactor/proactor-internal.h
)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/include/proton/netaddr.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/netaddr.h b/proton-c/include/proton/netaddr.h
index 7f21d93..47f1ac0 100644
--- a/proton-c/include/proton/netaddr.h
+++ b/proton-c/include/proton/netaddr.h
@@ -51,14 +51,39 @@ PNP_EXTERN int pn_netaddr_str(const pn_netaddr_t *addr, char *buf, size_t size);
/**
* Get the local address of a transport. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled)
*/
PNP_EXTERN const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t);
/**
- * Get the remote address of a transport. Return NULL if not available.
+ * Get the local address of a transport. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled)
*/
PNP_EXTERN const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t);
+/**
+ * Get the listening addresses of a listener.
+ *
+ * A listener can have more than one address for several reasons:
+ * - DNS host records may indicate more than one address
+ * - On a multi-homed host, listening on the default host "" will listen on all local addresses.
+ * - Some IPv4/IPV6 configurations may expand a single address into a v4/v6 pair.
+ *
+ * pn_netaddr_next() will iterate over all the addresses in the list.
+ *
+ * @param l points to the listener
+ * @return The first listening address or NULL if there are no addresses are available.
+ * Use pn_netaddr_next() to iterate over the list.
+ * Pointer is invalid after the listener closes (PN_LISTENER_CLOSED event is handled)
+ */
+PNP_EXTERN const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l);
+
+/**
+ * @return Pointer to the next address in a list of addresses, NULL if at the end of the list or
+ * if this address is not part of a list.
+ */
+PNP_EXTERN const pn_netaddr_t *pn_netaddr_next(const pn_netaddr_t *na);
+
struct sockaddr;
/**
@@ -74,6 +99,12 @@ PNP_EXTERN const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na);
PNP_EXTERN size_t pn_netaddr_socklen(const pn_netaddr_t *na);
/**
+ * Get the host and port name from na as separate strings.
+ * Returns 0 if successful, non-0 on error.
+ */
+PNP_EXTERN int pn_netaddr_host_port(const pn_netaddr_t* na, char *host, size_t hlen, char *port, size_t plen);
+
+/**
* @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 8f6dacb..3d507ff 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -32,7 +32,6 @@
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
-#include <proton/netaddr.h>
#include <proton/object.h>
#include <proton/proactor.h>
#include <proton/transport.h>
@@ -57,6 +56,8 @@
#include <limits.h>
#include <time.h>
+#include "./netaddr-internal.h" /* Include after socket/inet headers */
+
// TODO: replace timerfd per connection with global lightweight timer mechanism.
// logging in general
// SIGPIPE?
@@ -507,10 +508,6 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listene
pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
}
-struct pn_netaddr_t {
- struct sockaddr_storage ss;
-};
-
typedef struct pconnection_t {
psocket_t psocket;
pcontext_t context;
@@ -553,6 +550,7 @@ struct acceptor_t{
bool armed;
bool overflowed;
acceptor_t *next; /* next listener list member */
+ struct pn_netaddr_t addr; /* listening address */
};
struct pn_listener_t {
@@ -1435,8 +1433,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
assert(l->acceptors); /* TODO aconway 2017-05-05: memory safety */
l->acceptors_size = 0;
+ uint16_t dynamic_port = 0; /* Record dynamic port from first bind(0) */
/* Find working listen addresses */
for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+ if (dynamic_port) set_port(ai->ai_addr, dynamic_port);
int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
static int on = 1;
if (fd >= 0) {
@@ -1448,6 +1448,15 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
!listen(fd, backlog))
{
acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
+ /* Get actual address */
+ socklen_t len = pn_netaddr_socklen(&acceptor->addr);
+ (void)getsockname(fd, (struct sockaddr*)(&acceptor->addr.ss), &len);
+ if (acceptor == l->acceptors) { /* First acceptor, check for dynamic port */
+ dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&acceptor->addr));
+ } else { /* Link addr to previous addr */
+ (acceptor-1)->addr.next = &acceptor->addr;
+ }
+
acceptor->accepted_fd = -1;
psocket_t *ps = &acceptor->psocket;
psocket_init(ps, p, l, addr);
@@ -2195,14 +2204,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
wake_notify(&p->context);
}
-const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) {
- return (struct sockaddr*)na;
-}
-
-size_t pn_netaddr_socklen(const pn_netaddr_t *na) {
- return sizeof(struct sockaddr_storage);
-}
-
const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) {
pconnection_t *pc = get_pconnection(pn_transport_connection(t));
return pc? &pc->local : NULL;
@@ -2213,26 +2214,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) {
return pc ? &pc->remote : NULL;
}
-#ifndef NI_MAXHOST
-# define NI_MAXHOST 1025
-#endif
-
-#ifndef NI_MAXSERV
-# define NI_MAXSERV 32
-#endif
-
-int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss),
- host, sizeof(host), port, sizeof(port),
- NI_NUMERICHOST | NI_NUMERICSERV);
- if (!err) {
- return snprintf(buf, len, "%s:%s", host, port);
- } else {
- if (buf) *buf = '\0';
- return 0;
- }
+const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) {
+ return l->acceptors_size > 0 ? &l->acceptors[0].addr : NULL;
}
pn_millis_t pn_proactor_now(void) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 7be83fe..edbe214 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -32,12 +32,12 @@
#include <proton/engine.h>
#include <proton/listener.h>
#include <proton/message.h>
-#include <proton/netaddr.h>
#include <proton/object.h>
#include <proton/proactor.h>
#include <proton/transport.h>
#include <uv.h>
+#include "netaddr-internal.h" /* Include after socket headers via uv.h */
/* All asserts are cheap and should remain in a release build for debuggability */
#undef NDEBUG
@@ -157,10 +157,6 @@ PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket)
typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state;
-struct pn_netaddr_t {
- struct sockaddr_storage ss;
-};
-
/* An incoming or outgoing connection. */
typedef struct pconnection_t {
work_t work; /* Must be first to allow casting */
@@ -212,6 +208,11 @@ struct pn_listener_t {
/* Only used by leader */
addr_t addr;
lsocket_t *lsockets;
+ int dynamic_port; /* Record dynamic port from first bind(0) */
+
+ /* Invariant listening addresses allocated during leader_listen_lh() */
+ struct pn_netaddr_t *addrs;
+ int addrs_len;
/* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
* detach a listener from the UV loop to prevent concurrent access.
@@ -602,10 +603,20 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
if (err) {
free(ls); /* Will never be closed */
} else {
+ if (l->dynamic_port) set_port(ai->ai_addr, l->dynamic_port);
int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0;
err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags);
if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection);
if (!err) {
+ /* Get actual listening address */
+ pn_netaddr_t *na = &l->addrs[l->addrs_len++];
+ int len = sizeof(na->ss);
+ uv_tcp_getsockname(&ls->tcp, (struct sockaddr*)(&na->ss), &len);
+ if (na == l->addrs) { /* First socket, check for dynamic port bind */
+ l->dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(na));
+ } else {
+ (na-1)->next = na; /* Link into list */
+ }
/* Add to l->lsockets list */
ls->parent = l;
ls->next = l->lsockets;
@@ -624,6 +635,13 @@ static void leader_listen_lh(pn_listener_t *l) {
add_active(l->work.proactor);
int err = leader_resolve(l->work.proactor, &l->addr, true);
if (!err) {
+ /* Allocate enough space for the pn_netaddr_t addresses */
+ size_t len = 0;
+ for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
+ ++len;
+ }
+ l->addrs = (pn_netaddr_t*)calloc(len, sizeof(lsocket_t));
+
/* Find the working addresses */
for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
int err2 = lsocket(l, ai);
@@ -646,6 +664,7 @@ static void leader_listen_lh(pn_listener_t *l) {
void pn_listener_free(pn_listener_t *l) {
if (l) {
+ if (l->addrs) free(l->addrs);
if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */
uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
}
@@ -1285,14 +1304,6 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t
work_notify(&l->work);
}
-const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) {
- return (struct sockaddr*)na;
-}
-
-size_t pn_netaddr_socklen(const pn_netaddr_t *na) {
- return sizeof(struct sockaddr_storage);
-}
-
const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) {
pconnection_t *pc = get_pconnection(pn_transport_connection(t));
return pc? &pc->local : NULL;
@@ -1303,18 +1314,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) {
return pc ? &pc->remote : NULL;
}
-int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss),
- host, sizeof(host), port, sizeof(port),
- NI_NUMERICHOST | NI_NUMERICSERV);
- if (!err) {
- return snprintf(buf, len, "%s:%s", host, port);
- } else {
- if (buf) *buf = '\0';
- return 0;
- }
+const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) {
+ return l->addrs ? &l->addrs[0] : NULL;
}
pn_millis_t pn_proactor_now(void) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/netaddr-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/netaddr-internal.h b/proton-c/src/proactor/netaddr-internal.h
new file mode 100644
index 0000000..16e406c
--- /dev/null
+++ b/proton-c/src/proactor/netaddr-internal.h
@@ -0,0 +1,96 @@
+#ifndef PROACTOR_NETADDR_INTERNAL_H
+#define PROACTOR_NETADDR_INTERNAL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/netaddr.h>
+
+/* Common code for proactors that use the POSIX/Winsock sockaddr library for socket addresses. */
+
+struct pn_netaddr_t {
+ struct sockaddr_storage ss;
+ pn_netaddr_t *next;
+};
+
+const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) {
+ return na ? (struct sockaddr*)&na->ss : NULL;
+}
+
+size_t pn_netaddr_socklen(const pn_netaddr_t *na) {
+ return sizeof(na->ss);
+}
+
+const pn_netaddr_t *pn_netaddr_next(const pn_netaddr_t *na) {
+ return na ? na->next : NULL;
+}
+
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+int pn_netaddr_host_port(const pn_netaddr_t* na, char *host, size_t hlen, char *port, size_t plen) {
+ return getnameinfo(pn_netaddr_sockaddr(na), pn_netaddr_socklen(na),
+ host, hlen, port, plen, NI_NUMERICHOST | NI_NUMERICSERV);
+}
+
+int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ int err = pn_netaddr_host_port(na, host, sizeof(host), port, sizeof(port));
+ if (!err) {
+ return pn_proactor_addr(buf, len, host, port);
+ } else {
+ if (buf) *buf = '\0';
+ return 0;
+ }
+}
+
+/* Return port or -1 if sa is not a known address type */
+static int get_port(const struct sockaddr *sa) {
+ switch (sa->sa_family) {
+ case AF_INET: return ((struct sockaddr_in*)sa)->sin_port;
+ case AF_INET6: return ((struct sockaddr_in6*)sa)->sin6_port;
+ default: return -1;
+ }
+}
+
+/* Set the port in sa or do nothing if it is not a known address type */
+static void set_port(struct sockaddr *sa, uint16_t port) {
+ switch (sa->sa_family) {
+ case AF_INET: ((struct sockaddr_in*)sa)->sin_port = port; break;
+ case AF_INET6: ((struct sockaddr_in6*)sa)->sin6_port = port; break;
+ default: break;
+ }
+}
+
+/* If want has port=0 and got has port > 0 then return port of got, else return 0 */
+static uint16_t check_dynamic_port(const struct sockaddr *want, const struct sockaddr *got) {
+ if (get_port(want) == 0) {
+ int port = get_port(got);
+ if (port > 0) return (uint16_t)port;
+ }
+ return 0;
+}
+
+#endif /*!PROACTOR_NETADDR_INTERNAL_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/proactor-internal.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.c b/proton-c/src/proactor/proactor-internal.c
index f3f834a..6d96fee 100644
--- a/proton-c/src/proactor/proactor-internal.c
+++ b/proton-c/src/proactor/proactor-internal.c
@@ -37,15 +37,16 @@ static const char *AMQPS_PORT_NAME = "amqps";
const char *PNI_IO_CONDITION = "proton:io";
-#ifndef _WIN32
-/*
- * Common implementation for C99-friendly compilers. Windows is
- * not and implements its own.
- */
int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) {
- return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : "");
+ /* Don't use snprintf, Windows is not C99 compliant and snprintf is broken. */
+ if (buf && len > 0) {
+ buf[0] = '\0';
+ if (host) strncat(buf, host, len);
+ strncat(buf, ":", len);
+ if (port) strncat(buf, port, len);
+ }
+ return (host ? strlen(host) : 0) + (port ? strlen(port) : 0) + 1;
}
-#endif
int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, const char **port)
{
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/proactor-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h
index 7cc7363..67c0bf6 100644
--- a/proton-c/src/proactor/proactor-internal.h
+++ b/proton-c/src/proactor/proactor-internal.h
@@ -1,5 +1,5 @@
-#ifndef PROACTOR_NETADDR_INTERNAL_H
-#define PROACTOR_NETADDR_INTERNAL_H
+#ifndef PROACTOR_PROACTOR_INTERNAL_H
+#define PROACTOR_PROACTOR_INTERNAL_H
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -48,4 +48,4 @@ extern const char *PNI_IO_CONDITION;
void pni_proactor_set_cond(
pn_condition_t *cond, const char *what, const char *host, const char *port, const char *msg);
-#endif // PROACTOR_NETADDR_INTERNAL_H
+#endif /*!PROACTOR_PROACTOR_INTERNAL_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c
index 0be4b51..71f9b0d 100644
--- a/proton-c/src/proactor/win_iocp.c
+++ b/proton-c/src/proactor/win_iocp.c
@@ -21,7 +21,6 @@
#include <proton/condition.h>
#include <proton/connection_driver.h>
-#include <proton/netaddr.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <proton/object.h>
@@ -45,6 +44,8 @@
#include <iostream>
#include <sstream>
+#include "./netaddr-internal.h" /* Include after socket/inet headers */
+
/*
* Proactor for Windows using IO completion ports.
*
@@ -1657,8 +1658,9 @@ static void pcontext_finalize(pcontext_t* ctx) {
}
typedef struct psocket_t {
- iocpdesc_t *iocpd; // NULL if reaper, or socket open failure.
+ iocpdesc_t *iocpd; /* NULL if reaper, or socket open failure. */
pn_listener_t *listener; /* NULL for a connection socket */
+ pn_netaddr_t listen_addr; /* Not filled in for connection sockets */
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
bool is_reaper;
@@ -1695,10 +1697,6 @@ struct pn_proactor_t {
bool shutting_down;
};
-struct pn_netaddr_t {
- struct sockaddr_storage ss;
-};
-
typedef struct pconnection_t {
psocket_t psocket;
pcontext_t context;
@@ -2786,8 +2784,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
assert(l->psockets); /* TODO: memory safety */
l->psockets_size = 0;
+ uint16_t dynamic_port = 0; /* Record dynamic port from first bind(0) */
/* Find working listen addresses */
for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+ if (dynamic_port) set_port(ai->ai_addr, dynamic_port);
// Note fd destructor can clear WSAGetLastError()
unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol));
if (fd != INVALID_SOCKET) {
@@ -2796,13 +2796,21 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
if (!::listen(fd, backlog)) {
iocpdesc_t *iocpd = pni_iocpdesc_create(p->iocp, fd);
if (iocpd) {
- fd.release();
+ pn_socket_t sock = fd.release();
psocket_t *ps = &l->psockets[l->psockets_size++];
psocket_init(ps, l, false, addr);
ps->iocpd = iocpd;
iocpd->is_mp = true;
iocpd->active_completer = ps;
pni_iocpdesc_start(ps->iocpd);
+ /* Get actual address */
+ socklen_t len = sizeof(ps->listen_addr.ss);
+ (void)getsockname(sock, (struct sockaddr*)&ps->listen_addr.ss, &len);
+ if (ps == l->psockets) { /* First socket, check for dynamic port */
+ dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&ps->listen_addr));
+ } else {
+ (ps-1)->listen_addr.next = &ps->listen_addr; /* Link into list */
+ }
}
}
}
@@ -3393,17 +3401,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
}
}
-
-static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...);
-
-const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) {
- return (struct sockaddr*)na;
-}
-
-size_t pn_netaddr_socklen(const pn_netaddr_t *na) {
- return sizeof(struct sockaddr_storage);
-}
-
const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) {
pconnection_t *pc = get_pconnection(pn_transport_connection(t));
return pc? &pc->local : NULL;
@@ -3414,26 +3411,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) {
return pc ? &pc->remote : NULL;
}
-#ifndef NI_MAXHOST
-# define NI_MAXHOST 1025
-#endif
-
-#ifndef NI_MAXSERV
-# define NI_MAXSERV 32
-#endif
-
-int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
- int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss),
- host, sizeof(host), port, sizeof(port),
- NI_NUMERICHOST | NI_NUMERICSERV);
- if (!err) {
- return pni2_snprintf(buf, len, "%s:%s", host, port);
- } else {
- if (buf) *buf = '\0';
- return 0;
- }
+const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) {
+ return l->psockets ? &l->psockets[0].listen_addr : NULL;
}
pn_millis_t pn_proactor_now(void) {
@@ -3445,40 +3424,3 @@ pn_millis_t pn_proactor_now(void) {
// Convert to milliseconds and adjust base epoch
return t.QuadPart / 10000 - 11644473600000;
}
-
-
-// ======================================================================
-// Platform dependent sprintf for pn_proactor_addr()
-// ======================================================================
-
-#include <stdarg.h>
-// [v]snprintf on Windows only matches C99 when no errors or overflow.
-static int pni2_vsnprintf(char *buf, size_t count, const char *fmt, va_list ap) {
- if (fmt == NULL)
- return -1;
- if ((buf == NULL) && (count > 0))
- return -1;
- if (count > 0) {
- int n = vsnprintf_s(buf, count, _TRUNCATE, fmt, ap);
- if (n >= 0) // no overflow
- return n; // same as C99
- buf[count-1] = '\0';
- }
- // separate call to get needed buffer size on overflow
- int n = _vscprintf(fmt, ap);
- if (n >= (int) count)
- return n;
- return -1;
-}
-
-static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- int n = pni2_vsnprintf(buf, count, fmt, ap);
- va_end(ap);
- return n;
-}
-
-int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) {
- return pni2_snprintf(buf, len, "%s:%s", host ? host : "", port ? port : "");
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/fdlimit.py
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/fdlimit.py b/proton-c/src/tests/fdlimit.py
index 53751cb..57faef8 100644
--- a/proton-c/src/tests/fdlimit.py
+++ b/proton-c/src/tests/fdlimit.py
@@ -20,17 +20,19 @@ from __future__ import print_function
from proctest import *
+def wait_listening(proc):
+ m = proc.wait_re("listening on ([0-9]+)$")
+ return m.group(1), m.group(0)+"\n" # Return (port, line)
+
class LimitedBroker(object):
def __init__(self, test, fdlimit):
self.test = test
self.fdlimit = fdlimit
def __enter__(self):
- with TestPort() as tp:
- self.port = str(tp.port)
- self.proc = self.test.proc(['prlimit', '-n%d' % self.fdlimit, 'broker', '', self.port])
- self.proc.wait_re("listening")
- return self
+ self.proc = self.test.proc(["broker", "", "0"])
+ self.port, _ = wait_listening(self.proc)
+ return self
def __exit__(self, *args):
b = getattr(self, "proc")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 844cf95..a9eef18 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -17,7 +17,6 @@
* under the License.
*/
-#include "test_port.h"
#include "test_tools.h"
#include "test_handler.h"
#include "test_config.h"
@@ -36,8 +35,6 @@
#include <stdlib.h>
#include <string.h>
-static const char *localhost = ""; /* host for connect/listen */
-
#define ARRAYLEN(A) (sizeof(A)/sizeof((A)[0]))
/* Proactor and handler that take part in a test */
@@ -136,19 +133,37 @@ static void test_proactors_drain(test_proactor_t *tps, size_t n) {
test_proactor_destroy((A)+i); \
} while (0)
-/* Combine a test_port with a pn_listener */
-typedef struct test_listener_t {
- test_port_t port;
- pn_listener_t *listener;
-} test_listener_t;
-/* Return a listening test_listener_t, raise errors if not successful */
-test_listener_t test_listen(test_proactor_t *tp, const char *host) {
- test_listener_t l = { test_port(host), pn_listener() };
- pn_proactor_listen(tp->proactor, l.listener, l.port.host_port, 4);
+#define MAX_STR 256
+struct addrinfo {
+ char host[MAX_STR];
+ char port[MAX_STR];
+ char connect[MAX_STR];
+ char host_port[MAX_STR];
+};
+
+struct addrinfo listener_info(pn_listener_t *l) {
+ struct addrinfo ai = {{0}};
+ const pn_netaddr_t *na = pn_netaddr_listening(l);
+ TEST_ASSERT(0 == pn_netaddr_host_port(na, ai.host, sizeof(ai.host), ai.port, sizeof(ai.port)));
+ for (na = pn_netaddr_next(na); na; na = pn_netaddr_next(na)) { /* Check that ports are consistent */
+ char port[MAX_STR];
+ TEST_ASSERT(0 == pn_netaddr_host_port(na, NULL, 0, port, sizeof(port)));
+ TEST_ASSERTF(0 == strcmp(port, ai.port), "%s != %s", port, ai.port);
+ }
+ (void)pn_proactor_addr(ai.connect, sizeof(ai.connect), "", ai.port); /* Address for connecting */
+ (void)pn_netaddr_str(na, ai.host_port, sizeof(ai.host_port)); /* host:port listening address */
+ return ai;
+}
+
+/* Return a pn_listener_t*, raise errors if not successful */
+pn_listener_t *test_listen(test_proactor_t *tp, const char *host) {
+ char addr[1024];
+ pn_listener_t *l = pn_listener();
+ (void)pn_proactor_addr(addr, sizeof(addr), host, "0");
+ pn_proactor_listen(tp->proactor, l, addr, 4);
TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
TEST_COND_EMPTY(tp->handler.t, last_condition);
- test_port_close(&l.port);
return l;
}
@@ -283,9 +298,9 @@ static pn_event_type_t open_close_handler(test_handler_t *th, pn_event_t *e) {
/* Test simple client/server connection with 2 proactors */
static void test_client_server(test_t *t) {
test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, common_handler) };
- test_listener_t l = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
/* Connect and wait for close at both ends */
- pn_proactor_connect2(tps[0].proactor, NULL, NULL, l.port.host_port);
+ pn_proactor_connect2(tps[0].proactor, NULL, NULL, listener_info(l).connect);
TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
TEST_PROACTORS_DESTROY(tps);
@@ -308,11 +323,11 @@ static pn_event_type_t open_wake_handler(test_handler_t *th, pn_event_t *e) {
static void test_connection_wake(test_t *t) {
test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
pn_proactor_t *client = tps[0].proactor;
- test_listener_t l = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
pn_connection_t *c = pn_connection();
pn_incref(c); /* Keep a reference for wake() after free */
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
pn_connection_wake(c);
@@ -325,7 +340,7 @@ static void test_connection_wake(test_t *t) {
/* Verify we don't get a wake after close even if they happen together */
pn_connection_t *c2 = pn_connection();
- pn_proactor_connect2(client, c2, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_connection_wake(c2);
pn_proactor_disconnect(client, NULL);
@@ -360,8 +375,8 @@ static pn_event_type_t listen_abort_handler(test_handler_t *th, pn_event_t *e) {
static void test_abort(test_t *t) {
test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_abort_handler) };
pn_proactor_t *client = tps[0].proactor;
- test_listener_t l = test_listen(&tps[1], localhost);
- pn_proactor_connect2(client, NULL, NULL, l.port.host_port);
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
/* server transport closes */
if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
@@ -374,7 +389,7 @@ static void test_abort(test_t *t) {
TEST_COND_DESC(t, "abort", last_condition);
}
- pn_listener_close(l.listener);
+ pn_listener_close(l);
while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
@@ -419,14 +434,14 @@ static pn_event_type_t listen_refuse_handler(test_handler_t *th, pn_event_t *e)
static void test_refuse(test_t *t) {
test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_refuse_handler) };
pn_proactor_t *client = tps[0].proactor;
- test_listener_t l = test_listen(&tps[1], localhost);
- pn_proactor_connect2(client, NULL, NULL, l.port.host_port);
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
/* client transport closes */
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* client */
TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
- pn_listener_close(l.listener);
+ pn_listener_close(l);
while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
@@ -456,9 +471,9 @@ static void test_inactive(test_t *t) {
pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
/* Listen, connect, disconnect */
- test_listener_t l = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
pn_connection_t *c = pn_connection();
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_connection_wake(c);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
@@ -475,7 +490,7 @@ static void test_inactive(test_t *t) {
/* Connect, set-timer, disconnect */
pn_proactor_set_timeout(client, 1000000);
c = pn_connection();
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_connection_wake(c);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
@@ -489,7 +504,7 @@ static void test_inactive(test_t *t) {
/* Server won't be INACTIVE until listener is closed */
TEST_CHECK(t, pn_proactor_get(server) == NULL);
- pn_listener_close(l.listener);
+ pn_listener_close(l);
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
@@ -529,12 +544,12 @@ static void test_errors(test_t *t) {
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
/* Listen on a port already in use */
- test_port_t port = test_port(localhost);
pn_listener_t *l = pn_listener();
- pn_proactor_listen(server, l, port.host_port, 1);
+ pn_proactor_listen(server, l, ":0", 1);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, TEST_PROACTORS_RUN(tps));
test_handler_clear(&tps[1].handler, 0);
- pn_proactor_listen(server, pn_listener(), port.host_port, 1); /* Busy */
+ struct addrinfo laddr = listener_info(l);
+ pn_proactor_listen(server, pn_listener(), laddr.connect, 1); /* Busy */
TEST_PROACTORS_RUN(tps);
TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
TEST_COND_NAME(t, "proton:io", last_condition);
@@ -544,13 +559,12 @@ static void test_errors(test_t *t) {
/* Connect with no listener */
c = pn_connection();
- pn_proactor_connect2(client, c, NULL, port.host_port);
+ pn_proactor_connect2(client, c, NULL, laddr.connect);
if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
TEST_COND_DESC(t, "refused", last_condition);
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
}
- test_port_close(&port);
TEST_PROACTORS_DESTROY(tps);
}
@@ -590,25 +604,27 @@ static void test_ipv4_ipv6(test_t *t) {
pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
/* Listen on all interfaces for IPv4 only. */
- test_listener_t l4 = test_listen(&tps[1], "0.0.0.0");
+ pn_listener_t *l4 = test_listen(&tps[1], "0.0.0.0");
TEST_PROACTORS_DRAIN(tps);
/* Empty address listens on both IPv4 and IPv6 on all interfaces */
- test_listener_t l = test_listen(&tps[1], "");
+ pn_listener_t *l = test_listen(&tps[1], "");
TEST_PROACTORS_DRAIN(tps);
-#define EXPECT_CONNECT(TP, HOST) do { \
- pn_proactor_connect2(client, NULL, NULL, test_port_use_host(&(TP), (HOST))); \
- TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); \
+#define EXPECT_CONNECT(LISTENER, HOST) do { \
+ char addr[1024]; \
+ pn_proactor_addr(addr, sizeof(addr), HOST, listener_info(LISTENER).port); \
+ pn_proactor_connect2(client, NULL, NULL, addr); \
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); \
TEST_COND_EMPTY(t, last_condition); \
- TEST_PROACTORS_DRAIN(tps); \
+ TEST_PROACTORS_DRAIN(tps); \
} while(0)
- EXPECT_CONNECT(l4.port, "127.0.0.1"); /* v4->v4 */
- EXPECT_CONNECT(l4.port, ""); /* local->v4*/
+ EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */
+ EXPECT_CONNECT(l4, ""); /* local->v4*/
- EXPECT_CONNECT(l.port, "127.0.0.1"); /* v4->all */
- EXPECT_CONNECT(l.port, ""); /* local->all */
+ EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */
+ EXPECT_CONNECT(l, ""); /* local->all */
/* Listen on ipv6 loopback, if it fails skip ipv6 tests.
@@ -617,24 +633,24 @@ static void test_ipv4_ipv6(test_t *t) {
local ipv6 loopback configured, so "::1" will force an error.
*/
TEST_PROACTORS_DRAIN(tps);
- test_listener_t l6 = { test_port("::1"), pn_listener() };
- pn_proactor_listen(server, l6.listener, l6.port.host_port, 4);
+ pn_listener_t *l6 = pn_listener();
+ pn_proactor_listen(server, l6, "::1:0", 4);
pn_event_type_t e = TEST_PROACTORS_RUN(tps);
if (e == PN_LISTENER_OPEN && !pn_condition_is_set(last_condition)) {
TEST_PROACTORS_DRAIN(tps);
- EXPECT_CONNECT(l6.port, "::1"); /* v6->v6 */
- EXPECT_CONNECT(l6.port, ""); /* local->v6 */
- EXPECT_CONNECT(l.port, "::1"); /* v6->all */
+ EXPECT_CONNECT(l6, "::1"); /* v6->v6 */
+ EXPECT_CONNECT(l6, ""); /* local->v6 */
+ EXPECT_CONNECT(l, "::1"); /* v6->all */
- pn_listener_close(l6.listener);
+ pn_listener_close(l6);
} else {
const char *d = pn_condition_get_description(last_condition);
TEST_LOGF(t, "skip IPv6 tests: %s %s", pn_event_type_name(e), d ? d : "no condition");
}
- pn_listener_close(l.listener);
- pn_listener_close(l4.listener);
+ pn_listener_close(l);
+ pn_listener_close(l4);
TEST_PROACTORS_DESTROY(tps);
}
@@ -642,15 +658,15 @@ static void test_ipv4_ipv6(test_t *t) {
static void test_release_free(test_t *t) {
test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
pn_proactor_t *client = tps[0].proactor;
- test_listener_t l = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
/* leave one connection to the proactor */
- pn_proactor_connect2(client, NULL, NULL, l.port.host_port);
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
/* release c1 and free immediately */
pn_connection_t *c1 = pn_connection();
- pn_proactor_connect2(client, c1, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c1, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_proactor_release_connection(c1); /* We free but socket should still be cleaned up */
pn_connection_free(c1);
@@ -659,7 +675,7 @@ static void test_release_free(test_t *t) {
/* release c2 and but don't free till after proactor free */
pn_connection_t *c2 = pn_connection();
- pn_proactor_connect2(client, c2, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_proactor_release_connection(c2);
TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
@@ -747,10 +763,10 @@ static void test_ssl(test_t *t) {
pn_ssl_domain_t *cd = client->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
pn_ssl_domain_t *sd = server->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
TEST_CHECK(t, 0 == SET_CREDENTIALS(sd, "tserver"));
- test_listener_t l = test_listen(server, localhost);
+ pn_listener_t *l = test_listen(server, "");
/* Basic SSL connection */
- pn_proactor_connect2(client->proactor, NULL, NULL, l.port.host_port);
+ pn_proactor_connect2(client->proactor, NULL, NULL, listener_info(l).connect);
/* Open ok at both ends */
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
TEST_COND_EMPTY(t, last_condition);
@@ -764,7 +780,7 @@ static void test_ssl(test_t *t) {
TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, PN_SSL_VERIFY_PEER_NAME, NULL));
pn_connection_t *c = pn_connection();
pn_connection_set_hostname(c, "test_server");
- pn_proactor_connect2(client->proactor, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
TEST_COND_EMPTY(t, last_condition);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
@@ -775,7 +791,7 @@ static void test_ssl(test_t *t) {
/* Verify peer with bad hostname */
c = pn_connection();
pn_connection_set_hostname(c, "wrongname");
- pn_proactor_connect2(client->proactor, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
TEST_COND_DESC(t, "SSL", last_condition);
@@ -852,9 +868,9 @@ static void test_netaddr(test_t *t) {
test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
pn_proactor_t *client = tps[0].proactor;
/* Use IPv4 to get consistent results all platforms */
- test_listener_t l = test_listen(&tps[1], "127.0.0.1");
+ pn_listener_t *l = test_listen(&tps[1], "127.0.0.1");
pn_connection_t *c = pn_connection();
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps))) {
TEST_COND_EMPTY(t, last_condition); /* Show the last condition */
return; /* don't continue if connection is closed */
@@ -866,7 +882,7 @@ static void test_netaddr(test_t *t) {
pn_transport_t *ct = pn_connection_transport(c);
const pn_netaddr_t *na = pn_netaddr_remote(ct);
pn_netaddr_str(na, cr, sizeof(cr));
- TEST_STR_IN(t, test_port_use_host(&l.port, ""), cr); /* remote address has listening port */
+ TEST_STR_IN(t, listener_info(l).port, cr); /* remote address has listening port */
pn_connection_t *s = last_accepted; /* server side of the connection */
@@ -879,17 +895,12 @@ static void test_netaddr(test_t *t) {
pn_netaddr_str(pn_netaddr_remote(st), sr, sizeof(sr));
TEST_STR_EQUAL(t, cl, sr); /* client local == server remote */
- /* Examine as sockaddr */
- const struct sockaddr *sa = pn_netaddr_sockaddr(na);
- TEST_CHECK(t, AF_INET == sa->sa_family);
- char host[TEST_PORT_MAX_STR] = "";
- char serv[TEST_PORT_MAX_STR] = "";
- int err = getnameinfo(sa, pn_netaddr_socklen(na),
- host, sizeof(host), serv, sizeof(serv),
- NI_NUMERICHOST | NI_NUMERICSERV);
+ char host[MAX_STR] = "";
+ char serv[MAX_STR] = "";
+ int err = pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv));
TEST_CHECK(t, 0 == err);
TEST_STR_EQUAL(t, "127.0.0.1", host);
- TEST_STR_EQUAL(t, l.port.str, serv);
+ TEST_STR_EQUAL(t, listener_info(l).port, serv);
/* Make sure you can use NULL, 0 to get length of address string without a crash */
size_t len = pn_netaddr_str(pn_netaddr_local(ct), NULL, 0);
@@ -905,15 +916,15 @@ static void test_disconnect(test_t *t) {
pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
/* Start two listeners */
- test_listener_t l = test_listen(&tps[1], localhost);
- test_listener_t l2 = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_listener_t *l2 = test_listen(&tps[1], "");
/* Only wait for one connection to remote-open before disconnect */
pn_connection_t *c = pn_connection();
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_connection_t *c2 = pn_connection();
- pn_proactor_connect2(client, c2, NULL, l2.port.host_port);
+ pn_proactor_connect2(client, c2, NULL, listener_info(l2).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
TEST_PROACTORS_DRAIN(tps);
@@ -944,8 +955,8 @@ static void test_disconnect(test_t *t) {
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
/* Make sure the proactors are still functional */
- test_listener_t l3 = test_listen(&tps[1], localhost);
- pn_proactor_connect2(client, NULL, NULL, l3.port.host_port);
+ pn_listener_t *l3 = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l3).connect);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
pn_proactor_disconnect(client, NULL);
@@ -1027,7 +1038,7 @@ static void test_message_stream(test_t *t) {
test_proactor(t, message_stream_handler)
};
pn_proactor_t *client = tps[0].proactor;
- test_listener_t l = test_listen(&tps[1], localhost);
+ pn_listener_t *l = test_listen(&tps[1], "");
struct message_stream_context ctx = { 0 };
tps[0].handler.context = &ctx;
tps[1].handler.context = &ctx;
@@ -1042,7 +1053,7 @@ static void test_message_stream(test_t *t) {
pn_message_free(m);
pn_connection_t *c = pn_connection();
- pn_proactor_connect2(client, c, NULL, l.port.host_port);
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
pn_session_t *ssn = pn_session(c);
pn_session_open(ssn);
pn_link_t *snd = pn_sender(ssn, "x");
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/test_port.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_port.h b/proton-c/src/tests/test_port.h
deleted file mode 100644
index b252dd9..0000000
--- a/proton-c/src/tests/test_port.h
+++ /dev/null
@@ -1,142 +0,0 @@
-#ifndef TESTS_TEST_PORT_H
-#define TESTS_TEST_PORT_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/* Some simple platform-specifics to acquire an unused socket */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#if defined(_WIN32)
-
-# include <winsock2.h>
-# include <ws2tcpip.h>
-
-typedef SOCKET sock_t;
-
-static void test_snprintf(char *buf, size_t count, const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- _vsnprintf(buf, count, fmt, ap);
- buf[count-1] = '\0'; /* _vsnprintf doesn't null-terminate on overflow */
-}
-
-void check_err(int ret, const char *what) {
- if (ret) {
- char buf[512];
- FormatMessage(
- FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, WSAGetLastError(), NULL, buf, sizeof(buf), NULL);
- fprintf(stderr, "%s: %s\n", what, buf);
- abort();
- }
-}
-
-#else /* POSIX */
-
-# include <sys/types.h>
-# include <sys/socket.h>
-# include <netinet/in.h>
-# include <unistd.h>
-# include <netdb.h>
-# define test_snprintf snprintf
-
-typedef int sock_t;
-
-void check_err(int ret, const char *what) {
- if (ret) {
- perror(what); abort();
- }
-}
-
-#endif
-
-#define TEST_PORT_MAX_STR 1060
-
-/* Combines a sock_t with the int and char* versions of the port for convenience */
-typedef struct test_port_t {
- sock_t sock;
- int port; /* port as integer */
- char str[TEST_PORT_MAX_STR]; /* port as string */
- char host_port[TEST_PORT_MAX_STR]; /* host:port string */
-} test_port_t;
-
-/* Modifies tp->host_port to use host, returns the new tp->host_port */
-const char *test_port_use_host(test_port_t *tp, const char *host) {
- test_snprintf(tp->host_port, sizeof(tp->host_port), "%s:%d", host, tp->port);
- return tp->host_port;
-}
-
-/* Create a socket and bind(INADDR_LOOPBACK:0) to get a free port.
- Set socket options so the port can be bound and used for listen() within this process,
- even though it is bound to the test_port socket.
- Use host to create the host_port address string.
-*/
-test_port_t test_port(const char* host) {
-#ifdef _WIN32
- static int wsa_started = 0;
- if (!wsa_started) {
- WORD wsa_ver = MAKEWORD(2, 2);
- WSADATA unused;
- check_err(WSAStartup(wsa_ver, &unused), "WSAStartup");
- }
-#endif
- int err = 0;
- test_port_t tp = {0};
- tp.sock = socket(AF_INET, SOCK_STREAM, 0);
- check_err(tp.sock < 0, "socket");
-#ifndef _WIN32
- int on = 1;
- check_err(setsockopt(tp.sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)), "setsockopt");
-#endif
- struct sockaddr_in addr = {0};
- addr.sin_family = AF_INET; /* set the type of connection to TCP/IP */
- addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
- addr.sin_port = 0; /* bind to port 0 */
- err = bind(tp.sock, (struct sockaddr*)&addr, sizeof(addr));
- check_err(err, "bind");
- socklen_t len = sizeof(addr);
- err = getsockname(tp.sock, (struct sockaddr*)&addr, &len); /* Get the bound port */
- check_err(err, "getsockname");
- tp.port = ntohs(addr.sin_port);
- test_snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
- test_port_use_host(&tp, host);
-#ifdef _WIN32 /* Windows doesn't support the twice-open socket trick */
- closesocket(tp.sock);
-#elif defined (__APPLE__) || defined(__FreeBSD__)
- close(tp.sock);
-#endif
- return tp;
-}
-
-void test_port_close(test_port_t *tp) {
-#ifdef _WIN32
- WSACleanup();
-#elif defined (__APPLE__) || defined(__FreeBSD__)
- // We already closed and have no other cleanup to do
-#else
- close(tp->sock);
-#endif
-}
-
-
-#endif // TESTS_TEST_PORT_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 11354ee..d046a43 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -61,7 +61,7 @@ void test_vlogf_(test_t *t, const char *prefix, const char* expr,
}
if (t) fprintf(stderr, " [%s]", t->name);
fprintf(stderr, "\n");
- fflush(stdout);
+ fflush(stderr);
}
void test_logf_(test_t *t, const char *prefix, const char* expr,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1706: fix compiler warnings in
windows c examples
Posted by ac...@apache.org.
PROTON-1706: fix compiler warnings in windows c examples
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4dfe2969
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4dfe2969
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4dfe2969
Branch: refs/heads/master
Commit: 4dfe296920c9345dfbe8c9c535cbeb27055c01c2
Parents: 7bc25c6
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jan 4 12:47:25 2018 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jan 4 12:52:09 2018 -0500
----------------------------------------------------------------------
examples/c/broker.c | 14 ++++++++------
examples/c/direct.c | 2 +-
2 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4dfe2969/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index dee8526..3257cec 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -77,16 +77,17 @@
/* Simple thread-safe queue implementation */
typedef struct queue_t {
pthread_mutex_t lock;
- char name[256];
- VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
- VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+ char *name;
+ VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
+ VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
struct queue_t *next; /* Next queue in chain */
size_t sent; /* Count of messages sent, used as delivery tag */
} queue_t;
static void queue_init(queue_t *q, const char* name, queue_t *next) {
pthread_mutex_init(&q->lock, NULL);
- strncpy(q->name, name, sizeof(q->name)-1);
+ q->name = (char*)malloc(strlen(name)+1);
+ memcpy(q->name, name, strlen(name)+1);
VEC_INIT(q->messages);
VEC_INIT(q->waiting);
q->next = next;
@@ -102,6 +103,7 @@ static void queue_destroy(queue_t *q) {
for (i = 0; i < q->waiting.len; ++i)
pn_decref(q->waiting.data[i]);
VEC_FINAL(q->waiting);
+ free(q->name);
}
/* Send a message on s, or record s as eating if no messages.
@@ -357,7 +359,7 @@ static void handle(broker_t* b, pn_event_t* e) {
pn_link_t *l = pn_delivery_link(d);
size_t size = pn_delivery_pending(d);
pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */
- int recv;
+ ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
recv = pn_link_recv(l, m->start, m->size);
@@ -368,7 +370,7 @@ static void handle(broker_t* b, pn_event_t* e) {
pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
pn_link_flow(l, WINDOW - pn_link_credit(l)); /* Replace credit for the aborted message */
} else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */
- pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
+ pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code((int)recv));
pn_link_close(l); /* Unexpected error, close the link */
} else if (!pn_delivery_partial(d)) { /* Message is complete */
const char *qname = pn_terminus_get_address(pn_link_target(l));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4dfe2969/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 1943dcc..9ecda7c 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -145,7 +145,7 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
pn_link_t *l = pn_delivery_link(d);
size_t size = pn_delivery_pending(d);
pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
- int recv;
+ ssize_t recv;
m->size += size;
m->start = (char*)realloc(m->start, m->size);
recv = pn_link_recv(l, m->start, m->size);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org