You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/05/25 18:02:50 UTC
qpid-dispatch git commit: DISPATCH-774: modify HTTP code to work with
proton 0.17
Repository: qpid-dispatch
Updated Branches:
refs/heads/0.8.x d4a6cc538 -> 3d2ea0ef5
DISPATCH-774: modify HTTP code to work with proton 0.17
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3d2ea0ef
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3d2ea0ef
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3d2ea0ef
Branch: refs/heads/0.8.x
Commit: 3d2ea0ef5accb69393aed2f6afb15ba73a5dc9a6
Parents: d4a6cc5
Author: Alan Conway <ac...@redhat.com>
Authored: Thu May 25 13:59:36 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu May 25 13:59:36 2017 -0400
----------------------------------------------------------------------
src/http-libwebsockets.c | 70 +++++++++++++++++++++++++------------------
1 file changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3d2ea0ef/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index dc8ff58..96734f8 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -22,7 +22,9 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
-#include <proton/connection_driver.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/transport.h>
#include <libwebsockets.h>
@@ -90,8 +92,8 @@ static void buffer_set_size(buffer_t *buf, size_t size) {
/* AMQPWS connection: set as lws user data and qd_conn->context */
struct qd_http_connection_t {
- pn_connection_driver_t driver;
- qd_connection_t* qd_conn;
+ qd_connection_t *qd_conn;
+ pn_transport_t *transport;
buffer_t wbuf; /* LWS requires allocated header space at start of buffer */
struct lws *wsi;
char name[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port */
@@ -150,10 +152,11 @@ static int handle_events(qd_http_connection_t* c) {
return unexpected_close(c->wsi, "not-established");
}
qd_connection_process(c->qd_conn);
- if (pn_connection_driver_write_buffer(&c->driver).size) {
- lws_callback_on_writable(c->wsi);
+ if (pn_transport_pending(c->transport) > 0) {
+ lws_callback_on_writable(c->wsi);
}
- if (pn_connection_driver_finished(&c->driver)) {
+ bool has_event = pn_collector_peek(pn_connection_collector(c->qd_conn->pn_conn));
+ if (pn_transport_closed(c->transport) && !has_event) {
lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
c->closed = true;
qd_connection_process(c->qd_conn);
@@ -395,10 +398,18 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
c->qd_conn->listener = hl->listener;
lws_get_peer_simple(wsi, c->hostip, sizeof(c->hostip));
strncpy(c->name, c->hostip, sizeof(c->name));
- int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
- if (err) {
- return unexpected_close(c->wsi, pn_code(err));
+
+ c->qd_conn->pn_conn = pn_connection();
+ c->transport = pn_transport();
+ c->qd_conn->collector = pn_collector();
+ if (!c->qd_conn->pn_conn || !c->transport || !c->qd_conn->collector) {
+ if (c->qd_conn->pn_conn) pn_connection_free(c->qd_conn->pn_conn);
+ if (c->transport) pn_transport_free(c->transport);
+ if (c->qd_conn->collector) pn_collector_free(c->qd_conn->collector);
+ return unexpected_close(c->wsi, "out of memory");
}
+ pn_connection_collect(c->qd_conn->pn_conn, c->qd_conn->collector);
+
c->qd_conn->http = c;
c->qd_conn->server = hs->server;
c->qd_conn->connection_id = qd_server_connection_id(c->qd_conn->server);
@@ -406,24 +417,23 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
c->qd_conn->policy_counted = false;
const qd_server_config_t *config = hl->listener->config;
c->qd_conn->role = strdup(config->role);
- c->qd_conn->pn_conn = c->driver.connection;
pn_connection_set_context(c->qd_conn->pn_conn, ctx);
- c->qd_conn->collector = c->driver.collector;
qd_server_decorate_connection(c->qd_conn->server, c->qd_conn->pn_conn, config);
qd_log(hs->log, QD_LOG_DEBUG,
"[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
qd_connection_connection_id(c->qd_conn), c->hostip);
- pn_connection_driver_bind(&c->driver);
+ pn_transport_bind(c->transport, c->qd_conn->pn_conn);
return handle_events(c);
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
if (handle_events(c)) return -1;
- pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
- if (dbuf.size) {
+ ssize_t pending = pn_transport_pending(c->transport);
+ if (pending > 0) {
+ pn_bytes_t dbuf = pn_bytes(pending, pn_transport_head(c->transport));
/* lws_write() demands LWS_PRE bytes of free space before the data,
- * so we must copy from the driver's buffer to larger temporary wbuf
+ * so we must copy from the transport buffer to larger temporary wbuf
*/
buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
if (c->wbuf.start == NULL) {
@@ -433,10 +443,10 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
memcpy(buf, dbuf.start, dbuf.size);
ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
if (wrote < 0) {
- pn_connection_driver_write_close(&c->driver);
+ pn_transport_close_head(c->transport);
return unexpected_close(c->wsi, "write-error");
} else {
- pn_connection_driver_write_done(&c->driver, wrote);
+ pn_transport_pop(c->transport, wrote);
}
}
return handle_events(c);
@@ -445,13 +455,14 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_RECEIVE: {
while (len > 0) {
if (handle_events(c)) return -1;
- pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
- if (dbuf.size == 0) {
- return unexpected_close(c->wsi, "unexpected-data");
+ ssize_t cap = pn_transport_capacity(c->transport);
+ if (cap <= 0) {
+ return unexpected_close(c->wsi, "unexpected-close");
}
+ pn_rwbytes_t dbuf = pn_rwbytes(cap, pn_transport_tail(c->transport));
size_t copy = (len < dbuf.size) ? len : dbuf.size;
memcpy(dbuf.start, in, copy);
- pn_connection_driver_read_done(&c->driver, copy);
+ pn_transport_process(c->transport, copy);
len -= copy;
in = (char*)in + copy;
}
@@ -459,7 +470,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
}
case LWS_CALLBACK_USER: {
- pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+ pn_timestamp_t next_tick = pn_transport_tick(c->transport, hs->now);
if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
hs->next_tick = next_tick;
}
@@ -467,20 +478,21 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
}
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
- pn_connection_driver_read_close(&c->driver);
+ pn_transport_close_tail(c->transport);
return handle_events(c);
}
case LWS_CALLBACK_CLOSED: {
qd_log(wsi_log(wsi), QD_LOG_DEBUG, "HTTP connection closed to %s from %s",
wsi_listener(wsi)->host_port, c->name);
- if (c->driver.transport) {
- pn_connection_driver_close(&c->driver);
+ if (c->transport) {
+ pn_transport_close_tail(c->transport);
+ pn_transport_close_head(c->transport);
handle_events(c);
}
- pn_connection_driver_destroy(&c->driver);
- c->qd_conn->pn_conn = NULL;
- c->qd_conn->collector = NULL;
+ pn_transport_free(c->transport);
+ pn_connection_free(c->qd_conn->pn_conn);
+ pn_collector_free(c->qd_conn->collector);
qd_connection_free(c->qd_conn);
free(c->wbuf.start);
return -1;
@@ -539,7 +551,7 @@ static void* http_thread_run(void* v) {
break;
case W_WAKE: {
qd_http_connection_t *c = w.value;
- pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+ pn_collector_put(c->qd_conn->collector, PN_OBJECT, c->qd_conn->pn_conn,
PN_CONNECTION_WAKE);
handle_events(c);
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org