You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC
svn commit: r1622849 [6/9] - in
/qpid/proton/branches/fadams-javascript-binding: ./ contrib/
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c Sat Sep 6 11:23:10 2014
@@ -24,6 +24,7 @@
#include <string.h>
#include <proton/framing.h>
#include "protocol.h"
+#include "dispatch_actions.h"
#include <assert.h>
#include <stdarg.h>
@@ -42,7 +43,7 @@ static ssize_t transport_consume(pn_tran
void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
{
- db->deliveries = pn_hash(1024, 0.75, PN_REFCOUNT);
+ db->deliveries = pn_hash(0, 0.75, PN_REFCOUNT);
db->next = next;
}
@@ -92,16 +93,6 @@ void pn_delivery_map_clear(pn_delivery_m
}
}
-int pn_do_open(pn_dispatcher_t *disp);
-int pn_do_begin(pn_dispatcher_t *disp);
-int pn_do_attach(pn_dispatcher_t *disp);
-int pn_do_transfer(pn_dispatcher_t *disp);
-int pn_do_flow(pn_dispatcher_t *disp);
-int pn_do_disposition(pn_dispatcher_t *disp);
-int pn_do_detach(pn_dispatcher_t *disp);
-int pn_do_end(pn_dispatcher_t *disp);
-int pn_do_close(pn_dispatcher_t *disp);
-
static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available);
static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
@@ -150,16 +141,6 @@ static void pn_transport_initialize(void
amqp->buffered_input = NULL;
amqp->next = NULL;
- pn_dispatcher_action(transport->disp, OPEN, pn_do_open);
- pn_dispatcher_action(transport->disp, BEGIN, pn_do_begin);
- pn_dispatcher_action(transport->disp, ATTACH, pn_do_attach);
- pn_dispatcher_action(transport->disp, TRANSFER, pn_do_transfer);
- pn_dispatcher_action(transport->disp, FLOW, pn_do_flow);
- pn_dispatcher_action(transport->disp, DISPOSITION, pn_do_disposition);
- pn_dispatcher_action(transport->disp, DETACH, pn_do_detach);
- pn_dispatcher_action(transport->disp, END, pn_do_end);
- pn_dispatcher_action(transport->disp, CLOSE, pn_do_close);
-
transport->open_sent = false;
transport->open_rcvd = false;
transport->close_sent = false;
@@ -178,11 +159,10 @@ static void pn_transport_initialize(void
transport->remote_idle_timeout = 0;
transport->keepalive_deadline = 0;
transport->last_bytes_output = 0;
- transport->remote_offered_capabilities = pn_data(16);
- transport->remote_desired_capabilities = pn_data(16);
- transport->remote_properties = pn_data(16);
- transport->disp_data = pn_data(16);
- transport->error = pn_error();
+ transport->remote_offered_capabilities = pn_data(0);
+ transport->remote_desired_capabilities = pn_data(0);
+ transport->remote_properties = pn_data(0);
+ transport->disp_data = pn_data(0);
pn_condition_init(&transport->remote_condition);
transport->local_channels = pn_hash(0, 0.75, PN_REFCOUNT);
@@ -193,6 +173,8 @@ static void pn_transport_initialize(void
transport->input_pending = 0;
transport->output_pending = 0;
+
+ transport->done_processing = false;
}
pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -200,14 +182,20 @@ pn_session_t *pn_channel_state(pn_transp
return (pn_session_t *) pn_hash_get(transport->remote_channels, channel);
}
-static void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_t *session)
+static void pni_map_remote_channel(pn_session_t *session, uint16_t channel)
{
+ pn_transport_t *transport = session->connection->transport;
pn_hash_put(transport->remote_channels, channel, session);
session->state.remote_channel = channel;
}
-void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn)
+void pni_transport_unbind_handles(pn_hash_t *handles);
+
+static void pni_unmap_remote_channel(pn_session_t *ssn)
{
+ // XXX: should really update link state also
+ pni_transport_unbind_handles(ssn->state.remote_handles);
+ pn_transport_t *transport = ssn->connection->transport;
uint16_t channel = ssn->state.remote_channel;
ssn->state.remote_channel = -2;
// note: may free the session:
@@ -222,7 +210,7 @@ static void pn_transport_finalize(void *
pn_transport_t *pn_transport()
{
- static pn_class_t clazz = PN_CLASS(pn_transport);
+ static const pn_class_t clazz = PN_CLASS(pn_transport);
pn_transport_t *transport = (pn_transport_t *) pn_new(sizeof(pn_transport_t),
&clazz);
if (!transport) return NULL;
@@ -265,7 +253,6 @@ static void pn_transport_finalize(void *
pn_free(transport->remote_desired_capabilities);
pn_free(transport->remote_properties);
pn_free(transport->disp_data);
- pn_error_free(transport->error);
pn_condition_tini(&transport->remote_condition);
pn_free(transport->local_channels);
pn_free(transport->remote_channels);
@@ -281,22 +268,35 @@ int pn_transport_bind(pn_transport_t *tr
if (connection->transport) return PN_STATE_ERR;
transport->connection = connection;
connection->transport = transport;
- pn_incref(connection);
+ pn_incref2(connection, transport);
if (transport->open_rcvd) {
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
- pn_event_t *event = pn_collector_put(connection->collector,
- PN_CONNECTION_REMOTE_STATE);
- if (event) {
- pn_event_init_connection(event, connection);
- }
- if (!pn_error_code(transport->error)) {
- transport->disp->halt = false;
- transport_consume(transport); // blech - testBindAfterOpen
- }
+ pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_OPEN, connection);
+ transport->disp->halt = false;
+ transport_consume(transport); // blech - testBindAfterOpen
}
return 0;
}
+void pni_transport_unbind_handles(pn_hash_t *handles)
+{
+ for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) {
+ uintptr_t key = pn_hash_key(handles, h);
+ pn_hash_del(handles, key);
+ }
+}
+
+void pni_transport_unbind_channels(pn_hash_t *channels)
+{
+ for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) {
+ uintptr_t key = pn_hash_key(channels, h);
+ pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h);
+ pni_transport_unbind_handles(ssn->state.local_handles);
+ pni_transport_unbind_handles(ssn->state.remote_handles);
+ pn_hash_del(channels, key);
+ }
+}
+
int pn_transport_unbind(pn_transport_t *transport)
{
assert(transport);
@@ -305,6 +305,7 @@ int pn_transport_unbind(pn_transport_t *
pn_connection_t *conn = transport->connection;
transport->connection = NULL;
+ // XXX: what happens if the endpoints are freed before we get here?
pn_session_t *ssn = pn_session_head(conn, 0);
while (ssn) {
pn_delivery_map_clear(&ssn->state.incoming);
@@ -319,28 +320,31 @@ int pn_transport_unbind(pn_transport_t *
endpoint = endpoint->endpoint_next;
}
+ pni_transport_unbind_channels(transport->local_channels);
+ pni_transport_unbind_channels(transport->remote_channels);
+
pn_connection_unbound(conn);
- pn_decref(conn);
+ pn_decref2(conn, transport);
return 0;
}
pn_error_t *pn_transport_error(pn_transport_t *transport)
{
- return transport->error;
+ return NULL;
}
-static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
+static void pni_map_remote_handle(pn_link_t *link, uint32_t handle)
{
link->state.remote_handle = handle;
- pn_hash_put(ssn->state.remote_handles, handle, link);
+ pn_hash_put(link->session->state.remote_handles, handle, link);
}
-void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link)
+static void pni_unmap_remote_handle(pn_link_t *link)
{
- uint32_t handle = link->state.remote_handle;
+ uintptr_t handle = link->state.remote_handle;
link->state.remote_handle = -2;
// may delete link:
- pn_hash_del(ssn->state.remote_handles, handle);
+ pn_hash_del(link->session->state.remote_handles, handle);
}
pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle)
@@ -392,13 +396,12 @@ void pni_disposition_encode(pn_dispositi
}
}
-int pn_post_close(pn_transport_t *transport, const char *condition)
+int pn_post_close(pn_transport_t *transport, const char *condition, const char *description)
{
pn_condition_t *cond = NULL;
if (transport->connection) {
cond = pn_connection_condition(transport->connection);
}
- const char *description = NULL;
pn_data_t *info = NULL;
if (!condition && pn_condition_is_set(cond)) {
condition = pn_condition_get_name(cond);
@@ -418,13 +421,16 @@ int pn_do_error(pn_transport_t *transpor
// XXX: result
vsnprintf(buf, 1024, fmt, ap);
va_end(ap);
- pn_error_set(transport->error, PN_ERR, buf);
if (!transport->close_sent) {
- pn_post_close(transport, condition);
+ if (!transport->open_sent) {
+ pn_post_frame(transport->disp, 0, "DL[S]", OPEN, "");
+ }
+
+ pn_post_close(transport, condition, buf);
transport->close_sent = true;
}
transport->disp->halt = true;
- pn_transport_logf(transport, "ERROR %s %s", condition, pn_error_text(transport->error));
+ pn_transport_logf(transport, "ERROR %s %s", condition, buf);
return PN_ERR;
}
@@ -459,7 +465,6 @@ int pn_do_open(pn_dispatcher_t *disp)
}
disp->remote_max_frame = transport->remote_max_frame;
pn_buffer_clear( disp->frame );
- pn_buffer_ensure( disp->frame, disp->remote_max_frame );
}
if (container_q) {
transport->remote_container = pn_bytes_strdup(remote_container);
@@ -474,12 +479,7 @@ int pn_do_open(pn_dispatcher_t *disp)
if (conn) {
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
-
- pn_event_t *event = pn_collector_put(conn->collector,
- PN_CONNECTION_REMOTE_STATE);
- if (event) {
- pn_event_init_connection(event, conn);
- }
+ pn_collector_put(conn->collector, PN_CONNECTION_REMOTE_OPEN, conn);
} else {
transport->disp->halt = true;
}
@@ -506,15 +506,9 @@ int pn_do_begin(pn_dispatcher_t *disp)
ssn = pn_session(transport->connection);
}
ssn->state.incoming_transfer_count = next;
- pn_map_channel(transport, disp->channel, ssn);
+ pni_map_remote_channel(ssn, disp->channel);
PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
-
- pn_event_t *event = pn_collector_put(transport->connection->collector,
- PN_SESSION_REMOTE_STATE);
- if (event) {
- pn_event_init_session(event, ssn);
- }
-
+ pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, ssn);
return 0;
}
@@ -537,18 +531,18 @@ pn_link_t *pn_find_link(pn_session_t *ss
static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol)
{
if (!symbol.start)
- return PN_SESSION_CLOSE;
+ return PN_EXPIRE_WITH_SESSION;
if (!strncmp(symbol.start, "link-detach", symbol.size))
- return PN_LINK_CLOSE;
+ return PN_EXPIRE_WITH_LINK;
if (!strncmp(symbol.start, "session-end", symbol.size))
- return PN_SESSION_CLOSE;
+ return PN_EXPIRE_WITH_SESSION;
if (!strncmp(symbol.start, "connection-close", symbol.size))
- return PN_CONNECTION_CLOSE;
+ return PN_EXPIRE_WITH_CONNECTION;
if (!strncmp(symbol.start, "never", symbol.size))
- return PN_NEVER;
+ return PN_EXPIRE_NEVER;
- return PN_SESSION_CLOSE;
+ return PN_EXPIRE_WITH_SESSION;
}
static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol)
@@ -613,6 +607,10 @@ int pn_do_attach(pn_dispatcher_t *disp)
strname[name.size] = '\0';
pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+ if (!ssn) {
+ pn_do_error(transport, "amqp:connection:no-session", "attach without a session");
+ return PN_EOS;
+ }
pn_link_t *link = pn_find_link(ssn, name, is_sender);
if (!link) {
if (is_sender) {
@@ -626,7 +624,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
free(strheap);
}
- pn_map_handle(ssn, handle, link);
+ pni_map_remote_handle(link, handle);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link->remote_source;
if (source.start || src_dynamic) {
@@ -684,12 +682,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
link->state.delivery_count = idc;
}
- pn_event_t *event = pn_collector_put(transport->connection->collector,
- PN_LINK_REMOTE_STATE);
- if (event) {
- pn_event_init_link(event, link);
- }
-
+ pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_OPEN, link);
return 0;
}
@@ -770,11 +763,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
pn_post_flow(transport, ssn, link);
}
- pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY);
- if (event) {
- pn_event_init_delivery(event, delivery);
- }
-
+ pn_collector_put(transport->connection->collector, PN_DELIVERY, delivery);
return 0;
}
@@ -824,10 +813,7 @@ int pn_do_flow(pn_dispatcher_t *disp)
}
}
- pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_FLOW);
- if (event) {
- pn_event_init_link(event, link);
- }
+ pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
}
return 0;
@@ -922,10 +908,7 @@ int pn_do_disposition(pn_dispatcher_t *d
delivery->updated = true;
pn_work_update(transport->connection, delivery);
- pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY);
- if (event) {
- pn_event_init_delivery(event, delivery);
- }
+ pn_collector_put(transport->connection->collector, PN_DELIVERY, delivery);
}
}
@@ -955,16 +938,12 @@ int pn_do_detach(pn_dispatcher_t *disp)
if (closed)
{
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
- pn_event_t *event = pn_collector_put(transport->connection->collector,
- PN_LINK_REMOTE_STATE);
- if (event) {
- pn_event_init_link(event, link);
- }
+ pn_collector_put(transport->connection->collector, PN_LINK_REMOTE_CLOSE, link);
} else {
// TODO: implement
}
- pn_unmap_handle(ssn, link);
+ pni_unmap_remote_handle(link);
return 0;
}
@@ -975,12 +954,8 @@ int pn_do_end(pn_dispatcher_t *disp)
int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
if (err) return err;
PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
- pn_event_t *event = pn_collector_put(transport->connection->collector,
- PN_SESSION_REMOTE_STATE);
- if (event) {
- pn_event_init_session(event, ssn);
- }
- pn_unmap_channel(transport, ssn);
+ pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, ssn);
+ pni_unmap_remote_channel(ssn);
return 0;
}
@@ -992,11 +967,7 @@ int pn_do_close(pn_dispatcher_t *disp)
if (err) return err;
transport->close_rcvd = true;
PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
- pn_event_t *event = pn_collector_put(transport->connection->collector,
- PN_CONNECTION_REMOTE_STATE);
- if (event) {
- pn_event_init_connection(event, conn);
- }
+ pn_collector_put(transport->connection->collector, PN_CONNECTION_REMOTE_CLOSE, conn);
return 0;
}
@@ -1043,11 +1014,7 @@ static ssize_t transport_consume(pn_tran
} else if (n == 0) {
break;
} else {
- if (n != PN_EOS) {
- pn_transport_logf(transport, "ERROR[%i] %s\n",
- pn_error_code(transport->error),
- pn_error_text(transport->error));
- }
+ assert(n == PN_EOS);
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_transport_log(transport, " <- EOS");
transport->input_pending = 0; // XXX ???
@@ -1071,9 +1038,10 @@ static ssize_t pn_input_read_header(pn_t
if (!available || memcmp(bytes, point, delta)) {
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
- return pn_error_format(transport->error, PN_ERR,
- "%s header mismatch: '%s'%s", protocol, quoted,
- available ? "" : " (connection aborted)");
+ pn_do_error(transport, "amqp:connection:framing-error",
+ "%s header mismatch: '%s'%s", protocol, quoted,
+ available ? "" : " (connection aborted)");
+ return PN_EOS;
} else {
transport->header_count += delta;
if (transport->header_count == size) {
@@ -1102,21 +1070,20 @@ static ssize_t pn_input_read_amqp(pn_io_
if (transport->close_rcvd) {
if (available > 0) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
- return PN_ERR;
- } else {
return PN_EOS;
}
}
if (!available) {
pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
- return PN_ERR;
+ return PN_EOS;
}
ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
if (n < 0) {
- return pn_error_set(transport->error, n, "dispatch error");
+ //return pn_error_set(transport->error, n, "dispatch error");
+ return PN_EOS;
} else if (transport->close_rcvd) {
return PN_EOS;
} else {
@@ -1230,6 +1197,15 @@ size_t pn_session_incoming_window(pn_ses
}
}
+static void pni_map_local_channel(pn_session_t *ssn)
+{
+ pn_transport_t *transport = ssn->connection->transport;
+ pn_session_state_t *state = &ssn->state;
+ uint16_t channel = allocate_alias(transport->local_channels);
+ state->local_channel = channel;
+ pn_hash_put(transport->local_channels, channel, ssn);
+}
+
int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION && transport->open_sent)
@@ -1238,16 +1214,14 @@ int pn_process_ssn_setup(pn_transport_t
pn_session_state_t *state = &ssn->state;
if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
- uint16_t channel = allocate_alias(transport->local_channels);
+ pni_map_local_channel(ssn);
state->incoming_window = pn_session_incoming_window(ssn);
state->outgoing_window = pn_session_outgoing_window(ssn);
- pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
+ pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN,
((int16_t) state->remote_channel >= 0), state->remote_channel,
state->outgoing_transfer_count,
state->incoming_window,
state->outgoing_window);
- state->local_channel = channel;
- pn_hash_put(transport->local_channels, channel, ssn);
}
}
@@ -1258,18 +1232,25 @@ static const char *expiry_symbol(pn_expi
{
switch (policy)
{
- case PN_LINK_CLOSE:
+ case PN_EXPIRE_WITH_LINK:
return "link-detach";
- case PN_SESSION_CLOSE:
+ case PN_EXPIRE_WITH_SESSION:
return NULL;
- case PN_CONNECTION_CLOSE:
+ case PN_EXPIRE_WITH_CONNECTION:
return "connection-close";
- case PN_NEVER:
+ case PN_EXPIRE_NEVER:
return "never";
}
return NULL;
}
+static void pni_map_local_handle(pn_link_t *link) {
+ pn_link_state_t *state = &link->state;
+ pn_session_state_t *ssn_state = &link->session->state;
+ state->local_handle = allocate_alias(ssn_state->local_handles);
+ pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+}
+
int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (transport->open_sent && (endpoint->type == SENDER ||
@@ -1281,8 +1262,7 @@ int pn_process_link_setup(pn_transport_t
if (((int16_t) ssn_state->local_channel >= 0) &&
!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1)
{
- state->local_handle = allocate_alias(ssn_state->local_handles);
- pn_hash_put(ssn_state->local_handles, state->local_handle, link);
+ pni_map_local_handle(link);
const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
int err = pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH,
@@ -1464,6 +1444,8 @@ int pn_process_tpwork_sender(pn_transpor
link->queued--;
link->session->outgoing_deliveries--;
}
+
+ pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
}
}
@@ -1573,6 +1555,14 @@ int pn_process_flow_sender(pn_transport_
return 0;
}
+static void pni_unmap_local_handle(pn_link_t *link) {
+ pn_link_state_t *state = &link->state;
+ uintptr_t handle = state->local_handle;
+ state->local_handle = -2;
+ // may delete link
+ pn_hash_del(link->session->state.local_handles, handle);
+}
+
int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SENDER || endpoint->type == RECEIVER)
@@ -1601,8 +1591,7 @@ int pn_process_link_teardown(pn_transpor
int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH,
state->local_handle, true, (bool) name, ERROR, name, description, info);
if (err) return err;
- pn_hash_del(ssn_state->local_handles, state->local_handle);
- state->local_handle = -2;
+ pni_unmap_local_handle(link);
}
pn_clear_modified(transport->connection, endpoint);
@@ -1634,6 +1623,17 @@ bool pn_pointful_buffering(pn_transport_
return false;
}
+static void pni_unmap_local_channel(pn_session_t *ssn) {
+ // XXX: should really update link state also
+ pni_transport_unbind_handles(ssn->state.local_handles);
+ pn_transport_t *transport = ssn->connection->transport;
+ pn_session_state_t *state = &ssn->state;
+ uintptr_t channel = state->local_channel;
+ state->local_channel = -2;
+ // may delete session
+ pn_hash_del(transport->local_channels, channel);
+}
+
int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SESSION)
@@ -1643,7 +1643,9 @@ int pn_process_ssn_teardown(pn_transport
if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
&& !transport->close_sent)
{
- if (pn_pointful_buffering(transport, session)) return 0;
+ if (pn_pointful_buffering(transport, session)) {
+ return 0;
+ }
const char *name = NULL;
const char *description = NULL;
@@ -1658,8 +1660,7 @@ int pn_process_ssn_teardown(pn_transport
int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END,
(bool) name, ERROR, name, description, info);
if (err) return err;
- pn_hash_del(transport->local_channels, state->local_channel);
- state->local_channel = -2;
+ pni_unmap_local_channel(session);
}
pn_clear_modified(transport->connection, endpoint);
@@ -1673,7 +1674,7 @@ int pn_process_conn_teardown(pn_transpor
{
if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
if (pn_pointful_buffering(transport, NULL)) return 0;
- int err = pn_post_close(transport, NULL);
+ int err = pn_post_close(transport, NULL, NULL);
if (err) return err;
transport->close_sent = true;
}
@@ -1733,13 +1734,10 @@ static ssize_t pn_output_write_header(pn
{
if (transport->disp->trace & PN_TRACE_FRM)
pn_transport_logf(transport, " -> %s", protocol);
- if (size >= hdrsize) {
- memmove(bytes, header, hdrsize);
- transport->io_layers[PN_IO_AMQP].process_output = next;
- return hdrsize;
- } else {
- return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
- }
+ assert(size >= hdrsize);
+ memmove(bytes, header, hdrsize);
+ transport->io_layers[PN_IO_AMQP].process_output = next;
+ return hdrsize;
}
static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
@@ -1752,22 +1750,19 @@ static ssize_t pn_output_write_amqp_head
static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
{
pn_transport_t *transport = (pn_transport_t *)io_layer->context;
- if (!transport->connection) {
- return 0;
- }
-
- if (!pn_error_code(transport->error)) {
- pn_error_set(transport->error, pn_process(transport), "process error");
+ if (transport->connection && !transport->done_processing) {
+ int err = pn_process(transport);
+ if (err) {
+ pn_transport_logf(transport, "process error %i", err);
+ transport->done_processing = true;
+ }
}
- // write out any buffered data _before_ returning an error code,
- // else we could truncate an outgoing Close frame containing a
- // useful error status
- if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
- if (pn_error_code(transport->error))
- return pn_error_code(transport->error);
- else
- return PN_EOS;
+ // write out any buffered data _before_ returning PN_EOS, else we
+ // could truncate an outgoing Close frame containing a useful error
+ // status
+ if (!transport->disp->available && transport->close_sent) {
+ return PN_EOS;
}
return pn_dispatcher_output(transport->disp, bytes, size);
@@ -1779,20 +1774,18 @@ static ssize_t transport_produce(pn_tran
pn_io_layer_t *io_layer = transport->io_layers;
ssize_t space = transport->output_size - transport->output_pending;
- if (space == 0) { // can we expand the buffer?
+ if (space <= 0) { // can we expand the buffer?
int more = 0;
if (!transport->remote_max_frame) // no limit, so double it
more = transport->output_size;
else if (transport->remote_max_frame > transport->output_size)
- more = transport->remote_max_frame - transport->output_size;
+ more = pn_min(transport->output_size, transport->remote_max_frame - transport->output_size);
if (more) {
- char *newbuf = (char *)malloc( transport->output_size + more );
+ char *newbuf = (char *)realloc( transport->output_buf, transport->output_size + more );
if (newbuf) {
- memmove( newbuf, transport->output_buf, transport->output_pending );
- free( transport->output_buf );
transport->output_buf = newbuf;
transport->output_size += more;
- space = more;
+ space += more;
}
}
}
@@ -1811,11 +1804,12 @@ static ssize_t transport_produce(pn_tran
if (transport->output_pending)
break; // return what is available
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
- if (n == PN_EOS)
+ if (n < 0) {
pn_transport_log(transport, " -> EOS");
- else
+ }
+ /*else
pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n,
- pn_error_text(transport->error));
+ pn_error_text(transport->error));*/
}
return n;
}
@@ -1993,22 +1987,20 @@ ssize_t pn_transport_capacity(pn_transpo
//if (pn_error_code(transport->error)) return pn_error_code(transport->error);
ssize_t capacity = transport->input_size - transport->input_pending;
- if (!capacity) {
+ if ( capacity<=0 ) {
// can we expand the size of the input buffer?
int more = 0;
if (!transport->local_max_frame) { // no limit (ha!)
more = transport->input_size;
} else if (transport->local_max_frame > transport->input_size) {
- more = transport->local_max_frame - transport->input_size;
+ more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size);
}
if (more) {
- char *newbuf = (char *) malloc( transport->input_size + more );
+ char *newbuf = (char *) realloc( transport->input_buf, transport->input_size + more );
if (newbuf) {
- memmove( newbuf, transport->input_buf, transport->input_pending );
- free( transport->input_buf );
transport->input_buf = newbuf;
transport->input_size += more;
- capacity = more;
+ capacity += more;
}
}
}
@@ -2024,7 +2016,7 @@ char *pn_transport_tail(pn_transport_t *
return NULL;
}
-int pn_transport_push(pn_transport_t *transport, const char *src, size_t size)
+ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t size)
{
assert(transport);
@@ -2032,14 +2024,19 @@ int pn_transport_push(pn_transport_t *tr
if (capacity < 0) {
return capacity;
} else if (size > (size_t) capacity) {
- return PN_OVERFLOW;
+ size = capacity;
}
char *dst = pn_transport_tail(transport);
assert(dst);
memmove(dst, src, size);
- return pn_transport_process(transport, size);
+ int n = pn_transport_process(transport, size);
+ if (n < 0) {
+ return n;
+ } else {
+ return size;
+ }
}
int pn_transport_process(pn_transport_t *transport, size_t size)
@@ -2062,8 +2059,7 @@ int pn_transport_process(pn_transport_t
int pn_transport_close_tail(pn_transport_t *transport)
{
transport->tail_closed = true;
- ssize_t x = transport_consume( transport );
- if (x < 0) return (int) x;
+ transport_consume( transport );
return 0;
// XXX: what if not all input processed at this point? do we care???
}
@@ -2084,7 +2080,7 @@ const char *pn_transport_head(pn_transpo
return NULL;
}
-int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
+ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
{
assert(transport);
@@ -2092,7 +2088,7 @@ int pn_transport_peek(pn_transport_t *tr
if (pending < 0) {
return pending;
} else if (size > (size_t) pending) {
- return PN_UNDERFLOW;
+ size = pending;
}
if (pending > 0) {
@@ -2101,7 +2097,7 @@ int pn_transport_peek(pn_transport_t *tr
memmove(dst, src, size);
}
- return 0;
+ return size;
}
void pn_transport_pop(pn_transport_t *transport, size_t size)
@@ -2120,11 +2116,7 @@ void pn_transport_pop(pn_transport_t *tr
int pn_transport_close_head(pn_transport_t *transport)
{
transport->head_closed = true;
- if (transport->close_sent && transport->output_pending == 0) {
- return 0;
- } else {
- return pn_error_set(transport->error, PN_ERR, "connection aborted");
- }
+ return 0;
}
// true if the transport will not generate further output
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/types.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/types.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/types.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/types.c Sat Sep 6 11:23:10 2014
@@ -23,20 +23,8 @@
#include <stdlib.h>
#include <string.h>
-pn_bytes_t pn_bytes(size_t size, char *start)
+pn_bytes_t pn_bytes(size_t size, const char *start)
{
pn_bytes_t bytes = {size, start};
return bytes;
}
-
-pn_bytes_t pn_bytes_dup(size_t size, const char *start)
-{
- if (size && start)
- {
- char *dup = (char *) malloc(size);
- memmove(dup, start, size);
- return pn_bytes(size, dup);
- } else {
- return pn_bytes(0, NULL);
- }
-}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/util.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/util.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/util.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/util.c Sat Sep 6 11:23:10 2014
@@ -139,20 +139,28 @@ void pni_urldecode(const char *src, char
// Parse URL syntax:
// [ <scheme> :// ] [ <user> [ : <password> ] @ ] <host> [ : <port> ] [ / <path> ]
-// <user>, <password>, <host>, <port> cannot contain any of '@', ':', '/'
+// <scheme>, <user>, <password>, <port> cannot contain any of '@', ':', '/'
+// If the first character of <host> is '[' then it can contain any character up to ']' (this is to allow IPv6
+// literal syntax). Otherwise it also cannot contain '@', ':', '/'
+// <host> is not optional but it can be null! If it is not present an empty string will be returned
// <path> can contain any character
void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path)
{
if (!url) return;
- char *scheme_end = strstr(url, "://");
- if (scheme_end) {
- *scheme_end = '\0';
- *scheme = url;
- url = scheme_end + 3;
+ char *slash = strchr(url, '/');
+
+ if (slash && slash>url) {
+ char *scheme_end = strstr(slash-1, "://");
+
+ if (scheme_end && scheme_end<slash) {
+ *scheme_end = '\0';
+ *scheme = url;
+ url = scheme_end + 3;
+ slash = strchr(url, '/');
+ }
}
- char *slash = strchr(url, '/');
if (slash) {
*slash = '\0';
*path = slash + 1;
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/io.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/io.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/io.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/io.c Sat Sep 6 11:23:10 2014
@@ -27,32 +27,47 @@
#error "Proton requires Windows API support for XP or later."
#endif
#include <winsock2.h>
+#include <mswsock.h>
#include <Ws2tcpip.h>
#define PN_WINAPI
-#include "../platform.h"
+#include "platform.h"
#include <proton/io.h>
#include <proton/object.h>
+#include <proton/selector.h>
+#include "iocp.h"
+#include "util.h"
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <assert.h>
-static int pni_error_from_wsaerr(pn_error_t *error, const char *msg) {
- errno = WSAGetLastError();
- return pn_i_error_from_errno(error, msg);
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+ // Error code can be from GetLastError or WSAGetLastError,
+ char err[1024] = {0};
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+ FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+ return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void io_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
}
-
-#define MAX_HOST (1024)
-#define MAX_SERV (64)
-
struct pn_io_t {
- char host[MAX_HOST];
- char serv[MAX_SERV];
+ char host[NI_MAXHOST];
+ char serv[NI_MAXSERV];
pn_error_t *error;
+ bool trace;
bool wouldblock;
+ iocp_t *iocp;
};
void pn_io_initialize(void *obj)
@@ -60,21 +75,24 @@ void pn_io_initialize(void *obj)
pn_io_t *io = (pn_io_t *) obj;
io->error = pn_error();
io->wouldblock = false;
+ io->trace = pn_env_bool("PN_TRACE_DRV");
/* Request WinSock 2.2 */
WORD wsa_ver = MAKEWORD(2, 2);
WSADATA unused;
int err = WSAStartup(wsa_ver, &unused);
if (err) {
- pni_error_from_wsaerr(io->error, "pipe");
- fprintf(stderr, "Can't load WinSock: %d\n", err);
+ pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
+ fprintf(stderr, "Can't load WinSock: %d\n", pn_error_text(io->error));
}
+ io->iocp = pni_iocp();
}
void pn_io_finalize(void *obj)
{
pn_io_t *io = (pn_io_t *) obj;
pn_error_free(io->error);
+ pn_free(io->iocp);
WSACleanup();
}
@@ -84,7 +102,7 @@ void pn_io_finalize(void *obj)
pn_io_t *pn_io(void)
{
- static pn_class_t clazz = PN_CLASS(pn_io);
+ static const pn_class_t clazz = PN_CLASS(pn_io);
pn_io_t *io = (pn_io_t *) pn_new(sizeof(pn_io_t), &clazz);
return io;
}
@@ -100,20 +118,40 @@ pn_error_t *pn_io_error(pn_io_t *io)
return io->error;
}
+static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
+{
+ // A brand new socket can have the same HANDLE value as a previous
+ // one after a socketclose. If the application closes one itself
+ // (i.e. not using pn_close), we don't find out about it until here.
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
+ if (iocpd) {
+ if (io->trace)
+ io_log("Stale external socket reference discarded\n");
+ // Re-use means former socket instance was closed
+ assert(iocpd->ops_in_progress == 0);
+ assert(iocpd->external);
+ // Clean up the straggler as best we can
+ pn_socket_t sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount
+ }
+}
+
+
/*
- * Windows pipes don't work with select(), so a socket based pipe
- * workaround is provided. They do work with completion ports, so the
- * workaround can be disposed with in future.
+ * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
+ * now that select() is no longer used. If interrupt semantics are all that is
+ * needed, a simple user space counter and reserved completion status would
+ * probably suffice.
*/
-static int pni_socket_pair(SOCKET sv[2]);
+static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
int pn_pipe(pn_io_t *io, pn_socket_t *dest)
{
- int n = pni_socket_pair(dest);
+ int n = pni_socket_pair(io, dest);
if (n) {
- pni_error_from_wsaerr(io->error, "pipe");
+ pni_win32_error(io->error, "pipe", WSAGetLastError());
}
-
return n;
}
@@ -125,9 +163,14 @@ static void pn_configure_sock(pn_io_t *i
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
perror("setsockopt");
}
+
+ u_long nonblock = 1;
+ if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+ perror("ioctlsocket");
+ }
}
-static inline pn_socket_t pn_create_socket(void);
+static inline pn_socket_t pni_create_socket();
pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
{
@@ -138,34 +181,43 @@ pn_socket_t pn_listen(pn_io_t *io, const
return INVALID_SOCKET;
}
- pn_socket_t sock = pn_create_socket();
+ pn_socket_t sock = pni_create_socket();
if (sock == INVALID_SOCKET) {
- pni_error_from_wsaerr(io->error, "pn_create_socket");
+ pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
return INVALID_SOCKET;
}
+ ensure_unique(io, sock);
- BOOL optval = 1;
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &optval, sizeof(optval)) == -1) {
- pni_error_from_wsaerr(io->error, "setsockopt");
+ bool optval = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
+ sizeof(optval)) == -1) {
+ pni_win32_error(io->error, "setsockopt", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
- pni_error_from_wsaerr(io->error, "bind");
+ pni_win32_error(io->error, "bind", WSAGetLastError());
freeaddrinfo(addr);
closesocket(sock);
return INVALID_SOCKET;
}
-
freeaddrinfo(addr);
if (listen(sock, 50) == -1) {
- pni_error_from_wsaerr(io->error, "listen");
+ pni_win32_error(io->error, "listen", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ if (!iocpd) {
+ pn_i_error_from_errno(io->error, "register");
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+
+ pni_iocpdesc_start(iocpd);
return sock;
}
@@ -181,66 +233,83 @@ pn_socket_t pn_connect(pn_io_t *io, cons
return INVALID_SOCKET;
}
- pn_socket_t sock = pn_create_socket();
+ pn_socket_t sock = pni_create_socket();
if (sock == INVALID_SOCKET) {
- pni_error_from_wsaerr(io->error, "pn_create_socket");
+ pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
+ freeaddrinfo(addr);
return INVALID_SOCKET;
}
+ ensure_unique(io, sock);
pn_configure_sock(io, sock);
-
- if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
- if (WSAGetLastError() != WSAEWOULDBLOCK) {
- pni_error_from_wsaerr(io->error, "connect");
- freeaddrinfo(addr);
- closesocket(sock);
- return INVALID_SOCKET;
- }
- }
-
- freeaddrinfo(addr);
-
- return sock;
+ return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
}
-pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size)
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
{
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
socklen_t addrlen = sizeof(addr);
- pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen);
- if (sock == INVALID_SOCKET) {
- pni_error_from_wsaerr(io->error, "accept");
- return sock;
+ iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
+ pn_socket_t accept_sock;
+
+ if (listend)
+ accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
+ else {
+ // User supplied socket
+ accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
+ if (accept_sock == INVALID_SOCKET)
+ pni_win32_error(io->error, "sync accept", WSAGetLastError());
+ }
+
+ if (accept_sock == INVALID_SOCKET)
+ return accept_sock;
+
+ int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, 0);
+ if (code)
+ code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+ pn_close(io, accept_sock);
+ return INVALID_SOCKET;
} else {
- int code;
- if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) {
- pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
- if (closesocket(sock) == -1)
- pni_error_from_wsaerr(io->error, "closesocket");
- return INVALID_SOCKET;
- } else {
- pn_configure_sock(io, sock);
- snprintf(name, size, "%s:%s", io->host, io->serv);
- return sock;
+ pn_configure_sock(io, accept_sock);
+ snprintf(name, size, "%s:%s", io->host, io->serv);
+ if (listend) {
+ pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
}
+ return accept_sock;
}
}
-static inline pn_socket_t pn_create_socket(void) {
+static inline pn_socket_t pni_create_socket() {
return socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
}
ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
- ssize_t count = send(sockfd, (const char *) buf, len, 0);
- io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
+ if (iocpd) {
+ count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
+ } else {
+ count = send(sockfd, (const char *) buf, len, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
return count;
}
ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
{
- ssize_t count = recv(socket, (char *) buf, size, 0);
- io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd) {
+ count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
+ } else {
+ count = recv(socket, (char *) buf, size, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
return count;
}
@@ -257,7 +326,12 @@ ssize_t pn_read(pn_io_t *io, pn_socket_t
void pn_close(pn_io_t *io, pn_socket_t socket)
{
- closesocket(socket);
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd)
+ pni_iocp_begin_close(iocpd);
+ else {
+ closesocket(socket);
+ }
}
bool pn_wouldblock(pn_io_t *io)
@@ -265,8 +339,24 @@ bool pn_wouldblock(pn_io_t *io)
return io->wouldblock;
}
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+ if (io->iocp->selector == NULL)
+ io->iocp->selector = pni_selector_create(io->iocp);
+ return io->iocp->selector;
+}
-static int pni_socket_pair (SOCKET sv[2]) {
+static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
+{
+ u_long v = 1;
+ ioctlsocket (sock, FIONBIO, &v);
+ ensure_unique(io, sock);
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ pni_iocpdesc_start(iocpd);
+}
+
+
+static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
// no socketpair on windows. provide pipe() semantics using sockets
SOCKET sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
@@ -330,9 +420,9 @@ static int pni_socket_pair (SOCKET sv[2]
}
}
- u_long v = 1;
- ioctlsocket (sv[0], FIONBIO, &v);
- ioctlsocket (sv[1], FIONBIO, &v);
+ configure_pipe_socket(io, sv[0]);
+ configure_pipe_socket(io, sv[1]);
closesocket(sock);
return 0;
}
+
Added: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c Sat Sep 6 11:23:10 2014
@@ -0,0 +1,1138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+#define PN_WINAPI
+
+#include "../platform.h"
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/error.h>
+#include <proton/transport.h>
+#include "iocp.h"
+#include "util.h"
+#include <assert.h>
+
+/*
+ * Windows IO Completion Port support for Proton.
+ *
+ * Overlapped writes are used to avoid lengthy stalls between write
+ * completion and starting a new write. Non-overlapped reads are used
+ * since Windows accumulates inbound traffic without stalling and
+ * managing read buffers would not avoid a memory copy at the pn_read
+ * boundary.
+ */
+
+// Max number of overlapped accepts per listener
+#define IOCP_MAX_ACCEPTS 10
+
+// AcceptEx squishes the local and remote addresses and optional data
+// all together when accepting the connection. Reserve enough for
+// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
+// per address is required by AcceptEx.
+#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
+#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
+
+static void iocp_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
+{
+ char buf[512];
+ if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0, status, 0, buf, sizeof(buf), 0))
+ pn_error_set(error, code, buf);
+ else {
+ fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
+ }
+}
+
+static void reap_check(iocpdesc_t *);
+static void bind_to_completion_port(iocpdesc_t *iocpd);
+static void iocp_shutdown(iocpdesc_t *iocpd);
+static void start_reading(iocpdesc_t *iocpd);
+static bool is_listener(iocpdesc_t *iocpd);
+static void release_sys_sendbuf(SOCKET s);
+
+static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
+{
+ pni_win32_error(iocpd->error, text, status);
+ if (iocpd->iocp->iocp_trace) {
+ iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
+ }
+ if (!is_listener(iocpd) && !iocpd->write_closed && !pni_write_pipeline_size(iocpd->pipeline))
+ iocp_shutdown(iocpd);
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ pni_events_update(iocpd, iocpd->events | PN_READABLE | PN_WRITABLE);
+}
+
+// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
+static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
+{
+ GUID guid = WSAID_ACCEPTEX;
+ DWORD bytes = 0;
+ LPFN_ACCEPTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
+{
+ GUID guid = WSAID_CONNECTEX;
+ DWORD bytes = 0;
+ LPFN_CONNECTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
+{
+ GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
+ DWORD bytes = 0;
+ LPFN_GETACCEPTEXSOCKADDRS fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+// match accept socket to listener socket
+static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
+{
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
+ return NULL;
+ SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET)
+ return NULL;
+ return pni_iocpdesc_create(iocpd->iocp, s, false);
+}
+
+static bool is_listener(iocpdesc_t *iocpd)
+{
+ return iocpd && iocpd->acceptor;
+}
+
+// === Async accept processing
+
+typedef struct {
+ iocp_result_t base;
+ iocpdesc_t *new_sock;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ DWORD unused;
+} accept_result_t;
+
+static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
+ accept_result_t *result = (accept_result_t *) pn_new(sizeof(accept_result_t), 0);
+ memset(result, 0, sizeof(accept_result_t));
+ if (result) {
+ result->base.type = IOCP_ACCEPT;
+ result->base.iocpd = listen_sock;
+ }
+ return result;
+}
+
+static void reset_accept_result(accept_result_t *result) {
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
+}
+
+struct pni_acceptor_t {
+ int accept_queue_size;
+ pn_list_t *accepts;
+ iocpdesc_t *listen_sock;
+ bool signalled;
+ LPFN_ACCEPTEX fn_accept_ex;
+ LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
+};
+
+#define pni_acceptor_compare NULL
+#define pni_acceptor_inspect NULL
+#define pni_acceptor_hashcode NULL
+
+static void pni_acceptor_initialize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ acceptor->accepts = pn_list(IOCP_MAX_ACCEPTS, 0);
+}
+
+static void pni_acceptor_finalize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ size_t len = pn_list_size(acceptor->accepts);
+ for (size_t i = 0; i < len; i++)
+ pn_free(pn_list_get(acceptor->accepts, i));
+ pn_free(acceptor->accepts);
+}
+
+static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
+{
+ static const pn_class_t clazz = PN_CLASS(pni_acceptor);
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_new(sizeof(pni_acceptor_t), &clazz);
+ acceptor->listen_sock = iocpd;
+ acceptor->accept_queue_size = 0;
+ acceptor->signalled = false;
+ pn_socket_t sock = acceptor->listen_sock->socket;
+ acceptor->fn_accept_ex = lookup_accept_ex(sock);
+ acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
+ return acceptor;
+}
+
+static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
+{
+ if (acceptor->listen_sock->closing) {
+ if (result) {
+ pn_free(result);
+ acceptor->accept_queue_size--;
+ }
+ if (acceptor->accept_queue_size == 0)
+ acceptor->signalled = true;
+ return;
+ }
+
+ if (result) {
+ reset_accept_result(result);
+ } else {
+ if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
+ pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
+ result = accept_result(acceptor->listen_sock);
+ acceptor->accept_queue_size++;
+ } else {
+ // an async accept is still pending or max concurrent accepts already hit
+ return;
+ }
+ }
+
+ result->new_sock = create_same_type_socket(acceptor->listen_sock);
+ if (result->new_sock) {
+ // Not yet connected.
+ result->new_sock->read_closed = true;
+ result->new_sock->write_closed = true;
+
+ bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
+ result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+ &result->unused, (LPOVERLAPPED) result);
+ if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+ result->base.status = WSAGetLastError();
+ pn_list_add(acceptor->accepts, result);
+ pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE);
+ } else {
+ acceptor->listen_sock->ops_in_progress++;
+ // This socket is equally involved in the async operation.
+ result->new_sock->ops_in_progress++;
+ }
+ } else {
+ iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
+ }
+}
+
+static void complete_accept(accept_result_t *result, HRESULT status)
+{
+ result->new_sock->ops_in_progress--;
+ iocpdesc_t *ld = result->base.iocpd;
+ if (ld->read_closed) {
+ if (!result->new_sock->closing)
+ pni_iocp_begin_close(result->new_sock);
+ pn_free(result); // discard
+ reap_check(ld);
+ } else {
+ result->base.status = status;
+ pn_list_add(ld->acceptor->accepts, result);
+ pni_events_update(ld, ld->events | PN_READABLE);
+ }
+}
+
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error)
+{
+ if (!is_listener(ld)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return INVALID_SOCKET;
+ }
+ if (ld->read_closed) {
+ set_iocp_error_status(error, PN_ERR, WSAENOTSOCK);
+ return INVALID_SOCKET;
+ }
+ if (pn_list_size(ld->acceptor->accepts) == 0) {
+ if (ld->events & PN_READABLE && ld->iocp->iocp_trace)
+ iocp_log("listen socket readable with no available accept completions\n");
+ *would_block = true;
+ return INVALID_SOCKET;
+ }
+
+ accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0);
+ pn_list_del(ld->acceptor->accepts, 0, 1);
+ if (!pn_list_size(ld->acceptor->accepts))
+ pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts
+
+ pn_socket_t accept_sock;
+ if (result->base.status) {
+ accept_sock = INVALID_SOCKET;
+ pni_win32_error(ld->error, "accept failure", result->base.status);
+ if (ld->iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(ld->error));
+ // App never sees this socket so close it here.
+ pni_iocp_begin_close(result->new_sock);
+ } else {
+ accept_sock = result->new_sock->socket;
+ // AcceptEx special setsockopt:
+ setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket,
+ sizeof (SOCKET));
+ if (addr && addrlen && *addrlen > 0) {
+ sockaddr_storage *local_addr = NULL;
+ sockaddr_storage *remote_addr = NULL;
+ int local_addrlen, remote_addrlen;
+ LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs;
+ fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+ (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr,
+ &remote_addrlen);
+ *addrlen = pn_min(*addrlen, remote_addrlen);
+ memmove(addr, remote_addr, *addrlen);
+ }
+ }
+
+ if (accept_sock != INVALID_SOCKET) {
+ // Connected.
+ result->new_sock->read_closed = false;
+ result->new_sock->write_closed = false;
+ }
+
+ // Done with the completion result, so reuse it
+ result->new_sock = NULL;
+ begin_accept(ld->acceptor, result);
+ return accept_sock;
+}
+
+
+// === Async connect processing
+
+typedef struct {
+ iocp_result_t base;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ struct addrinfo *addrinfo;
+} connect_result_t;
+
+#define connect_result_initialize NULL
+#define connect_result_compare NULL
+#define connect_result_inspect NULL
+#define connect_result_hashcode NULL
+
+static void connect_result_finalize(void *object)
+{
+ connect_result_t *result = (connect_result_t *) object;
+ // Do not release addrinfo until ConnectEx completes
+ if (result->addrinfo)
+ freeaddrinfo(result->addrinfo);
+}
+
+static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
+ static const pn_class_t clazz = PN_CLASS(connect_result);
+ connect_result_t *result = (connect_result_t *) pn_new(sizeof(connect_result_t), &clazz);
+ if (result) {
+ memset(result, 0, sizeof(connect_result_t));
+ result->base.type = IOCP_CONNECT;
+ result->base.iocpd = iocpd;
+ result->addrinfo = addr;
+ }
+ return result;
+}
+
+pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
+{
+ // addr lives for the duration of the async connect. Caller has passed ownership here.
+ // See connect_result_finalize().
+ // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
+ sockaddr_storage sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.ss_family = addr->ai_family;
+ if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
+ pni_win32_error(error, "begin async connection", WSAGetLastError());
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ closesocket(sock);
+ freeaddrinfo(addr);
+ return INVALID_SOCKET;
+ }
+
+ iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false);
+ bind_to_completion_port(iocpd);
+ LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
+ connect_result_t *result = connect_result(iocpd, addr);
+ DWORD unused;
+ bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
+ NULL, 0, &unused, (LPOVERLAPPED) result);
+ if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
+ pn_free(result);
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ pni_iocp_begin_close(iocpd);
+ sock = INVALID_SOCKET;
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ } else {
+ iocpd->ops_in_progress++;
+ }
+ return sock;
+}
+
+static void complete_connect(connect_result_t *result, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pn_free(result);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status) {
+ iocpdesc_fail(iocpd, status, "Connect failure");
+ } else {
+ release_sys_sendbuf(iocpd->socket);
+ if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update context)");
+ } else {
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+ }
+ pn_free(result);
+ return;
+}
+
+
+// === Async writes
+
+static bool write_in_progress(iocpdesc_t *iocpd)
+{
+ return pni_write_pipeline_size(iocpd->pipeline) != 0;
+}
+
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
+{
+ write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_WRITE;
+ result->base.iocpd = iocpd;
+ result->buffer.start = buf;
+ result->buffer.size = buflen;
+ }
+ return result;
+}
+
+static int submit_write(write_result_t *result, const void *buf, size_t len)
+{
+ WSABUF wsabuf;
+ wsabuf.buf = (char *) buf;
+ wsabuf.len = len;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
+ (LPOVERLAPPED) result, 0);
+}
+
+ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
+{
+ if (len == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return INVALID_SOCKET;
+ }
+ if (iocpd->closing) {
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->write_closed) {
+ assert(pn_error_code(iocpd->error));
+ pn_error_copy(error, iocpd->error);
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("write error: %s\n", pn_error_text(error));
+ return SOCKET_ERROR;
+ }
+ if (len == 0) return 0;
+ if (!(iocpd->events & PN_WRITABLE)) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ size_t written = 0;
+ size_t requested = len;
+ const char *outgoing = (const char *) buf;
+ size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
+ if (!available) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ for (size_t wr_count = 0; wr_count < available; wr_count++) {
+ write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
+ assert(result);
+ result->base.iocpd = iocpd;
+ ssize_t actual_len = pn_min(len, result->buffer.size);
+ result->requested = actual_len;
+ memmove((void *)result->buffer.start, outgoing, actual_len);
+ outgoing += actual_len;
+ written += actual_len;
+ len -= actual_len;
+
+ int werror = submit_write(result, result->buffer.start, actual_len);
+ if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
+ return SOCKET_ERROR;
+ }
+ iocpd->ops_in_progress++;
+ }
+
+ if (!pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+ return written;
+}
+
+static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+ if (status == 0 && xfer_count > 0) {
+ if (xfer_count != result->requested) {
+ // Is this recoverable? How to preserve order if multiple overlapped writes?
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
+ return;
+ } else {
+ // Success.
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
+ return;
+ }
+ }
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, status, "IOCP async write error");
+}
+
+
+// === Async reads
+
+struct read_result_t {
+ iocp_result_t base;
+ size_t drain_count;
+ char unused_buf[1];
+};
+
+static read_result_t *read_result(iocpdesc_t *iocpd)
+{
+ read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_READ;
+ result->base.iocpd = iocpd;
+ }
+ return result;
+}
+
+static void begin_zero_byte_read(iocpdesc_t *iocpd)
+{
+ if (iocpd->read_in_progress) return;
+ if (iocpd->read_closed) {
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ return;
+ }
+
+ read_result_t *result = iocpd->read_result;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ DWORD flags = 0;
+ WSABUF wsabuf;
+ wsabuf.buf = result->unused_buf;
+ wsabuf.len = 0;
+ int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
+ &result->base.overlapped, 0);
+ if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
+ return;
+ }
+ iocpd->ops_in_progress++;
+ iocpd->read_in_progress = true;
+}
+
+static void drain_until_closed(iocpdesc_t *iocpd) {
+ int max_drain = 16 * 1024;
+ char buf[512];
+ read_result_t *result = iocpd->read_result;
+ while (result->drain_count < max_drain) {
+ int rv = recv(iocpd->socket, buf, 512, 0);
+ if (rv > 0)
+ result->drain_count += rv;
+ else if (rv == 0) {
+ iocpd->read_closed = true;
+ return;
+ } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+ // wait a little longer
+ start_reading(iocpd);
+ return;
+ }
+ else
+ break;
+ }
+ // Graceful close indication unlikely, force the issue
+ if (iocpd->iocp->iocp_trace)
+ if (result->drain_count >= max_drain)
+ iocp_log("graceful close on reader abandoned (too many chars)\n");
+ else
+ iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
+ iocpd->read_closed = true;
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+}
+
+
+static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ iocpd->read_in_progress = false;
+
+ if (iocpd->closing) {
+ // Application no longer reading, but we are looking for a zero length read
+ if (!iocpd->read_closed)
+ drain_until_closed(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status == 0 && xfer_count == 0) {
+ // Success.
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ } else {
+ iocpdesc_fail(iocpd, status, "IOCP read complete error");
+ }
+}
+
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
+{
+ if (size == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->closing) {
+ // Previous call to pn_close()
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->read_closed) {
+ if (pn_error_code(iocpd->error))
+ pn_error_copy(error, iocpd->error);
+ else
+ set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
+ return SOCKET_ERROR;
+ }
+
+ size_t count = recv(iocpd->socket, (char *) buf, size, 0);
+ if (count > 0) {
+ pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
+ begin_zero_byte_read(iocpd);
+ return count;
+ } else if (count == 0) {
+ iocpd->read_closed = true;
+ return 0;
+ }
+ if (WSAGetLastError() == WSAEWOULDBLOCK)
+ *would_block = true;
+ else
+ set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+ return SOCKET_ERROR;
+}
+
+static void start_reading(iocpdesc_t *iocpd)
+{
+ begin_zero_byte_read(iocpd);
+}
+
+
+// === The iocp descriptor
+
+static void pni_iocpdesc_initialize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ memset(iocpd, 0, sizeof(iocpdesc_t));
+ iocpd->socket = INVALID_SOCKET;
+}
+
+static void pni_iocpdesc_finalize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ pn_free(iocpd->acceptor);
+ pn_error_free(iocpd->error);
+ if (iocpd->pipeline)
+ if (write_in_progress(iocpd))
+ iocp_log("iocp descriptor write leak\n");
+ else
+ pn_free(iocpd->pipeline);
+ if (iocpd->read_in_progress)
+ iocp_log("iocp descriptor read leak\n");
+ else
+ free(iocpd->read_result);
+}
+
+static uintptr_t pni_iocpdesc_hashcode(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ return iocpd->socket;
+}
+
+#define pni_iocpdesc_compare NULL
+#define pni_iocpdesc_inspect NULL
+
+// Reference counted in the iocpdesc map, zombie_list, selector.
+static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
+{
+ static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
+ assert (s != INVALID_SOCKET);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_new(sizeof(iocpdesc_t), &clazz);
+ assert(iocpd);
+ iocpd->socket = s;
+ return iocpd;
+}
+
+static bool is_listener_socket(pn_socket_t s)
+{
+ BOOL tval = false;
+ int tvalsz = sizeof(tval);
+ int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
+ return code == 0 && tval;
+}
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
+ assert(!pni_iocpdesc_map_get(iocp, s));
+ bool listening = is_listener_socket(s);
+ iocpdesc_t *iocpd = pni_iocpdesc(s);
+ iocpd->iocp = iocp;
+ if (iocpd) {
+ iocpd->external = external;
+ iocpd->error = pn_error();
+ if (listening) {
+ iocpd->acceptor = pni_acceptor(iocpd);
+ } else {
+ iocpd->pipeline = pni_write_pipeline(iocpd);
+ iocpd->read_result = read_result(iocpd);
+ }
+ pni_iocpdesc_map_push(iocpd);
+ }
+ return iocpd;
+}
+
+// === Fast lookup of a socket's iocpdesc_t
+
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s);
+ return iocpd;
+}
+
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd) {
+ pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd);
+ pn_decref(iocpd);
+ assert(pn_refcount(iocpd) == 1);
+}
+
+void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) {
+ pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s);
+}
+
+static void bind_to_completion_port(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ if (!iocpd->iocp->completion_port) {
+ iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
+ return;
+ }
+
+ if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
+ iocpd->bound = true;
+ else {
+ iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
+ }
+}
+
+static void release_sys_sendbuf(SOCKET s)
+{
+ // Set the socket's send buffer size to zero.
+ int sz = 0;
+ int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
+ assert(status == 0);
+}
+
+void pni_iocpdesc_start(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ bind_to_completion_port(iocpd);
+ if (is_listener(iocpd)) {
+ begin_accept(iocpd->acceptor, NULL);
+ }
+ else {
+ release_sys_sendbuf(iocpd->socket);
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+}
+
+static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
+ result->iocpd->ops_in_progress--;
+ DWORD status = success ? 0 : GetLastError();
+
+ switch (result->type) {
+ case IOCP_ACCEPT:
+ complete_accept((accept_result_t *) result, status);
+ break;
+ case IOCP_CONNECT:
+ complete_connect((connect_result_t *) result, status);
+ break;
+ case IOCP_WRITE:
+ complete_write((write_result_t *) result, num_transferred, status);
+ break;
+ case IOCP_READ:
+ complete_read((read_result_t *) result, num_transferred, status);
+ break;
+ default:
+ assert(false);
+ }
+}
+
+void pni_iocp_drain_completions(iocp_t *iocp)
+{
+ while (true) {
+ DWORD timeout_ms = 0;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, timeout_ms);
+ if (!overlapped)
+ return; // timed out
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ complete(result, good_op, num_transferred);
+ }
+}
+
+// returns: -1 on error, 0 on timeout, 1 successful completion
+int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
+ DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, win_timeout);
+ if (!overlapped)
+ if (GetLastError() == WAIT_TIMEOUT)
+ return 0;
+ else {
+ if (error)
+ pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
+ return -1;
+ }
+
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ complete(result, good_op, num_transferred);
+ return 1;
+}
+
+// === Close (graceful and otherwise)
+
+// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
+// and fully closed.
+
+static void zombie_list_add(iocpdesc_t *iocpd)
+{
+ assert(iocpd->closing);
+ if (!iocpd->ops_in_progress) {
+ // No need to make a zombie.
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ }
+ return;
+ }
+ // Allow 2 seconds for graceful shutdown before releasing socket resource.
+ iocpd->reap_time = pn_i_now() + 2000;
+ pn_list_add(iocpd->iocp->zombie_list, iocpd);
+}
+
+static void reap_check(iocpdesc_t *iocpd)
+{
+ if (iocpd->closing && !iocpd->ops_in_progress) {
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ }
+ pn_list_remove(iocpd->iocp->zombie_list, iocpd);
+ // iocpd is decref'ed and possibly released
+ }
+}
+
+pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
+{
+ if (pn_list_size(iocp->zombie_list)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
+ return iocpd->reap_time;
+ }
+ return 0;
+}
+
+void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
+{
+ pn_list_t *zl = iocp->zombie_list;
+ // Look for stale zombies that should have been reaped by "now"
+ for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
+ if (iocpd->reap_time > now)
+ return;
+ if (iocpd->socket == INVALID_SOCKET)
+ continue;
+ assert(iocpd->ops_in_progress > 0);
+ if (iocp->iocp_trace)
+ iocp_log("async close: graceful close timeout exceeded\n");
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ // outstanding ops should complete immediately now
+ }
+}
+
+static void drain_zombie_completions(iocp_t *iocp)
+{
+ // No more pn_selector_select() from App, but zombies still need care and feeding
+ // until their outstanding async actions complete.
+ pni_iocp_drain_completions(iocp);
+
+ // Discard any that have no pending async IO
+ size_t sz = pn_list_size(iocp->zombie_list);
+ for (size_t idx = 0; idx < sz;) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
+ if (!iocpd->ops_in_progress) {
+ pn_list_del(iocp->zombie_list, idx, 1);
+ sz--;
+ } else {
+ idx++;
+ }
+ }
+
+ pn_timestamp_t now = pn_i_now();
+ pn_timestamp_t deadline = now + 2000;
+
+ while (pn_list_size(iocp->zombie_list)) {
+ if (now >= deadline)
+ break;
+ int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
+ if (rv < 0) {
+ iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
+ break;
+ }
+ now = pn_i_now();
+ }
+ if (now >= deadline && pn_list_size(iocp->zombie_list))
+ // Should only happen if really slow TCP handshakes, i.e. total network failure
+ iocp_log("network failure on Proton shutdown\n");
+}
+
+static pn_list_t *iocp_map_close_all(iocp_t *iocp)
+{
+ // Zombify stragglers, i.e. no pn_close() from the application.
+ pn_list_t *externals = pn_list(0, PN_REFCOUNT);
+ for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+ entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+ // Just listeners first.
+ if (is_listener(iocpd)) {
+ if (iocpd->external) {
+ // Owned by application, just keep a temporary reference to it.
+ // iocp_result_t structs must not be free'd until completed or
+ // the completion port is closed.
+ if (iocpd->ops_in_progress)
+ pn_list_add(externals, iocpd);
+ pni_iocpdesc_map_del(iocp, iocpd->socket);
+ } else {
+ // Make it a zombie.
+ pni_iocp_begin_close(iocpd);
+ }
+ }
+ }
+ pni_iocp_drain_completions(iocp);
+
+ for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+ entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+ if (iocpd->external) {
+ iocpd->read_closed = true; // Do not consume from read side
+ iocpd->write_closed = true; // Do not shutdown write side
+ if (iocpd->ops_in_progress)
+ pn_list_add(externals, iocpd);
+ pni_iocpdesc_map_del(iocp, iocpd->socket);
+ } else {
+ // Make it a zombie.
+ pni_iocp_begin_close(iocpd);
+ }
+ }
+ return externals;
+}
+
+static void zombie_list_hard_close_all(iocp_t *iocp)
+{
+ pni_iocp_drain_completions(iocp);
+ size_t zs = pn_list_size(iocp->zombie_list);
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ }
+ }
+ pni_iocp_drain_completions(iocp);
+
+ // Zombies should be all gone. Do a sanity check.
+ zs = pn_list_size(iocp->zombie_list);
+ int remaining = 0;
+ int ops = 0;
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ remaining++;
+ ops += iocpd->ops_in_progress;
+ }
+ if (remaining)
+ iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
+}
+
+static void iocp_shutdown(iocpdesc_t *iocpd)
+{
+ if (shutdown(iocpd->socket, SD_SEND)) {
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("socket shutdown failed %d\n", WSAGetLastError());
+ }
+ iocpd->write_closed = true;
+ if (iocpd->read_closed) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ }
+}
+
+void pni_iocp_begin_close(iocpdesc_t *iocpd)
+{
+ assert (!iocpd->closing);
+ if (is_listener(iocpd)) {
+ // Listening socket is easy. Close the socket which will cancel async ops.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->closing = true;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ closesocket(old_sock);
+ // Pending accepts will now complete. Zombie can die when all consumed.
+ zombie_list_add(iocpd);
+ pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
+ } else {
+ // Continue async operation looking for graceful close confirmation or timeout.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->closing = true;
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ zombie_list_add(iocpd);
+ pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
+ }
+}
+
+
+// === iocp_t
+
+#define pni_iocp_hashcode NULL
+#define pni_iocp_compare NULL
+#define pni_iocp_inspect NULL
+
+void pni_iocp_initialize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ memset(iocp, 0, sizeof(iocp_t));
+ pni_shared_pool_create(iocp);
+ iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ assert(iocp->completion_port != NULL);
+ iocp->iocpdesc_map = pn_hash(0, 0.75, PN_REFCOUNT);
+ iocp->zombie_list = pn_list(0, PN_REFCOUNT);
+ iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV");
+ iocp->selector = NULL;
+}
+
+void pni_iocp_finalize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ // Move sockets to closed state, except external sockets.
+ pn_list_t *externals = iocp_map_close_all(iocp);
+ // Now everything with ops_in_progress is in the zombie_list or the externals list.
+ assert(!pn_hash_head(iocp->iocpdesc_map));
+ pn_free(iocp->iocpdesc_map);
+
+ drain_zombie_completions(iocp); // Last chance for graceful close
+ zombie_list_hard_close_all(iocp);
+ CloseHandle(iocp->completion_port); // This cancels all our async ops
+ iocp->completion_port = NULL;
+
+ if (pn_list_size(externals) && iocp->iocp_trace)
+ iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals));
+
+ // Now safe to free everything that might be touched by a former async operation.
+ pn_free(externals);
+ pn_free(iocp->zombie_list);
+ pni_shared_pool_free(iocp);
+}
+
+iocp_t *pni_iocp()
+{
+ static const pn_class_t clazz = PN_CLASS(pni_iocp);
+ iocp_t *iocp = (iocp_t *) pn_new(sizeof(iocp_t), &clazz);
+ return iocp;
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h Sat Sep 6 11:23:10 2014
@@ -0,0 +1,141 @@
+#ifndef PROTON_SRC_IOCP_H
+#define PROTON_SRC_IOCP_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/selectable.h>
+#include <proton/type_compat.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pni_acceptor_t pni_acceptor_t;
+typedef struct write_result_t write_result_t;
+typedef struct read_result_t read_result_t;
+typedef struct write_pipeline_t write_pipeline_t;
+typedef struct iocpdesc_t iocpdesc_t;
+
+
+// One per pn_io_t.
+
+struct iocp_t {
+ HANDLE completion_port;
+ pn_hash_t *iocpdesc_map;
+ pn_list_t *zombie_list;
+ int shared_pool_size;
+ char *shared_pool_memory;
+ write_result_t **shared_results;
+ write_result_t **available_results;
+ int shared_available_count;
+ size_t writer_count;
+ int loopback_bufsize;
+ bool iocp_trace;
+ pn_selector_t *selector;
+};
+
+
+// One for each socket.
+// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list,
+// selector->iocp_descriptors list. It should remain ref counted in the
+// zombie_list until ops_in_progress == 0 or the completion port is closed.
+
+struct iocpdesc_t {
+ pn_socket_t socket;
+ iocp_t *iocp;
+ pni_acceptor_t *acceptor;
+ pn_error_t *error;
+ int ops_in_progress;
+ bool read_in_progress;
+ write_pipeline_t *pipeline;
+ read_result_t *read_result;
+ bool external; // true if socket set up outside Proton
+ bool bound; // associted with the completion port
+ bool closing; // pn_close called
+ bool read_closed; // EOF or read error
+ bool write_closed; // shutdown sent or write error
+ pn_selector_t *selector;
+ pn_selectable_t *selectable;
+ int events;
+ int interests;
+ pn_timestamp_t deadline;
+ iocpdesc_t *triggered_list_next;
+ iocpdesc_t *triggered_list_prev;
+ iocpdesc_t *deadlines_next;
+ iocpdesc_t *deadlines_prev;
+ pn_timestamp_t reap_time;;
+};
+
+typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
+
+typedef struct {
+ OVERLAPPED overlapped;
+ iocp_type_t type;
+ iocpdesc_t *iocpd;
+ HRESULT status;
+} iocp_result_t;
+
+struct write_result_t {
+ iocp_result_t base;
+ size_t requested;
+ bool in_use;
+ pn_bytes_t buffer;
+};
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
+void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
+void pni_iocpdesc_start(iocpdesc_t *iocpd);
+void pni_iocp_drain_completions(iocp_t *);
+int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
+void pni_iocp_start_accepting(iocpdesc_t *iocpd);
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
+pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
+ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
+void pni_iocp_begin_close(iocpdesc_t *iocpd);
+iocp_t *pni_iocp();
+
+void pni_events_update(iocpdesc_t *iocpd, int events);
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd);
+size_t pni_write_pipeline_size(write_pipeline_t *);
+bool pni_write_pipeline_writable(write_pipeline_t *);
+void pni_write_pipeline_return(write_pipeline_t *, write_result_t *);
+size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t);
+write_result_t *pni_write_pipeline_next(write_pipeline_t *);
+void pni_shared_pool_create(iocp_t *);
+void pni_shared_pool_free(iocp_t *);
+void pni_zombie_check(iocp_t *, pn_timestamp_t);
+pn_timestamp_t pni_zombie_deadline(iocp_t *);
+
+pn_selector_t *pni_selector_create(iocp_t *iocp);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* iocp.h */
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org