You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by fi...@apache.org on 2011/07/21 16:57:05 UTC
svn commit: r1149207 [2/3] - in
/incubator/etch/branches/singlestack/binding-c:
compiler/src/main/resources/org/apache/etch/bindings/c/compiler/ runtime/
runtime/c/ runtime/c/include/ runtime/c/src/main/
runtime/c/src/main/common/ runtime/c/src/main/tr...
Modified: incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_transport.c
URL: http://svn.apache.org/viewvc/incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_transport.c?rev=1149207&r1=1149206&r2=1149207&view=diff
==============================================================================
--- incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_transport.c (original)
+++ incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_transport.c Thu Jul 21 14:56:59 2011
@@ -31,6 +31,8 @@
#include "etch_messagizer.h"
#include "etch_tcp_server.h"
#include "etch_tcp_connection.h"
+#include "etch_udp_server.h"
+#include "etch_udp_connection.h"
#include "etch_exception.h"
#include "etch_objecttypes.h"
#include "etch_log.h"
@@ -43,22 +45,29 @@ extern apr_pool_t* g_etch_main_pool;
typedef etch_plainmailboxmgr etch_mailbox_manager;
+etch_delivery_service* get_etch_ds_impl (i_delivery_service*);
-int tcpdelsvc_init (etch_tcp_delivery_service*);
+int delsvc_init (etch_delivery_service*);
int destroy_delivery_service_interface (void*);
int destroy_delivery_service_via_interface(void*);
int destroy_tcp_delivery_service(void*);
-int tcpdelsvc_begincall(i_delivery_service*, etch_message*, i_mailbox**);
-int tcpdelsvc_endcall (i_delivery_service*, i_mailbox*, etch_type*, etch_object**);
-int tcpdelsvc_session_message (void*, etch_who*, etch_message*);
-int tcpdelsvc_session_control (void*, etch_event*, etch_object*);
-int tcpdelsvc_session_notify (void*, etch_event*);
-etch_object* tcpdelsvc_session_query (void*, etch_query*);
-i_session* tcpdelsvc_get_session(void*);
-etch_object* tcpdelsvc_transport_query (void*, etch_query*);
-int tcpdelsvc_transport_control(void*, etch_event*, etch_object*);
-int tcpdelsvc_transport_notify (void*, etch_event*);
+int destroy_udp_delivery_service(void*);
+int delsvc_begincall(i_delivery_service*, etch_message*, i_mailbox**);
+int delsvc_endcall (i_delivery_service*, i_mailbox*, etch_type*, etch_object**);
+int delsvc_session_message (void*, etch_who*, etch_message*);
+int delsvc_session_control (void*, etch_event*, etch_object*);
+int delsvc_session_notify (void*, etch_event*);
+etch_object* delsvc_session_query (void*, etch_query*);
+i_session* delsvc_get_session(void*);
+
+int delsvc_transport_control(void*, etch_event*, etch_object*);
+int delsvc_transport_notify(void*, etch_event*);
+etch_object* delsvc_transport_query(void*, etch_query*);
+etch_session* remove_etch_session (etch_server_factory*, const int session_id);
+
+i_delivery_service* new_etch_transport_a(etch_url* url, etch_factory_params*, void* conximpl);
+i_sessionlistener* new_etch_listener_a (etch_url* url, etch_resources* resources, void* factory_thisx, helper_listener_create_func, main_server_create_func, helper_resources_init_func, new_server_func new_server_create);
/* - - - - - - - - - - - - - - - - - - - - - -
* delivery service
@@ -101,7 +110,12 @@ i_delivery_service* new_etch_transport_a
if (is_url_scheme_udp(url))
{
- /* not yet implemented */
+ etch_udp_delivery_service* udpds = new_udp_delivery_service (url, params, conximpl);
+
+ if (udpds) {
+ newds = udpds->ids;
+ newds->thisx = udpds;
+ }
}
#if(0)
else /* handlers for other url schemes follow here eventually */
@@ -149,7 +163,7 @@ etch_tcp_delivery_service* new_tcp_deliv
etch_messagizer* messagizer = NULL;
etch_mailbox_manager* mboxmgr = NULL;
etch_tcp_delivery_service* delsvc = NULL;
- const int is_tcpconx_owned = tcpconx == NULL;
+ const int is_connection_owned = tcpconx == NULL;
ETCH_ASSERT(params && params->in_resx);
resources = params->in_resx;
@@ -192,9 +206,9 @@ etch_tcp_delivery_service* new_tcp_deliv
delsvc->packetizer = packetizer; /* todo can we lose these refs */
delsvc->messagizer = messagizer;
delsvc->resources = resources;
- delsvc->is_tcpconx_owned = is_tcpconx_owned;
+ delsvc->is_connection_owned = is_connection_owned;
- tcpdelsvc_init (delsvc); /* initialize the delivery service interface */
+ delsvc_init ((etch_delivery_service *)delsvc); /* initialize the delivery service interface */
} while(0);
@@ -217,47 +231,121 @@ etch_tcp_delivery_service* new_tcp_deliv
return delsvc;
}
+/*
+ * new_udp_delivery_service()
+ * etch_udp_delivery_service constructor
+ * @param params server parameter bundle, caller retains.
+ * ¶m udpx if present, the already accepted client connection.
+ * if present, caller retains.
+ */
+etch_udp_delivery_service* new_udp_delivery_service (etch_url* url,
+ etch_factory_params* params, etch_udp_connection* udpconx)
+{
+ etch_resources* resources = NULL;
+ etch_messagizer* messagizer = NULL;
+ etch_mailbox_manager* mboxmgr = NULL;
+ etch_udp_delivery_service* delsvc = NULL;
+ const int is_connection_owned = udpconx == NULL;
+ ETCH_ASSERT(params && params->in_resx);
+ resources = params->in_resx;
+
+ do
+ { /* as each next higher layer of the delivery service is instantiated, it
+ * is passed passed a transport interface to the previously-instantiated
+ * layer. in each such case, note that the new layer does not own memory
+ * for the passed transport interface.
+ */
+ if (NULL == udpconx)
+ udpconx = new_udp_connection (url, params->in_resx, NULL);
+
+ ETCH_ASSERT(udpconx);
+ if (0 != init_etch_udpconx_interfaces (udpconx)) break;
+
+ messagizer = new_messagizer_a (udpconx->itp, url, resources);
+ if (NULL == messagizer) break;
+
+ mboxmgr = new_plain_mailbox_manager (messagizer->transportmsg,
+ url->raw, resources, params->mblock);
+ if (NULL == mboxmgr) break;
+
+ delsvc = (etch_udp_delivery_service*) new_delivery_service
+ (sizeof(etch_udp_delivery_service), CLASSID_TCP_DELIVERYSVC);
+
+ ((etch_object*)delsvc)->destroy = destroy_udp_delivery_service;
+
+ /* set our transport to that of the next lower layer (mailbox manager) */
+ delsvc->transport = mboxmgr->transportmsg;
+ delsvc->transportx = mboxmgr->imanager; /* todo can we lose this ref? */
+
+ delsvc->mailboxmgr = mboxmgr;
+ delsvc->udpconx = udpconx;
+ delsvc->wait_up = udpconx->cx.wait_up; /* connection up/down monitor */
+ delsvc->wait_down = udpconx->cx.wait_down; /* connection up/down monitor */
+ delsvc->rwlock = params->mblock; /* not owned */
+ delsvc->messagizer = messagizer;
+ delsvc->resources = resources;
+ delsvc->is_connection_owned = is_connection_owned;
+
+ delsvc_init ((etch_delivery_service *)delsvc); /* initialize the delivery service interface */
+
+ } while(0);
+
+ if (NULL == delsvc)
+ {
+ etch_object_destroy(udpconx);
+ udpconx = NULL;
+
+ etch_object_destroy(messagizer);
+ messagizer = NULL;
+
+ etch_object_destroy(mboxmgr);
+ mboxmgr = NULL;
+
+ }
+
+ return delsvc;
+}
+
+
/**
- * tcpdelsvc_set_session()
+ * delsvc_set_session()
* @param session the i_sessionmessage interface. caller retains ownership.
* this is generally called from the stub constructor.
*/
-void tcpdelsvc_set_session (void* data, void* sessionData)
+void delsvc_set_session (void* data, void* sessionData)
{
i_delivery_service* ids = (i_delivery_service*)data;
i_sessionmessage* session = (i_sessionmessage*)sessionData;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
ETCH_ASSERT(is_etch_sessionmsg(session));
/* set delivery service session to be the passed (stub's) session */
- tcpds->session = tcpds->ids->ism = session;
+ ds->session = ds->ids->ism = session;
}
/**
- * tcpdelsvc_transport_message()
+ * delsvc_transport_message()
* @param whoto recipient - caller retains this memory, can be null.
* @param message the message
* caller relinquishes this memory on success, retains on failure.
* @return 0 success, -1 error.
*/
-int tcpdelsvc_transport_message (void* data, void* whoData, void* messageData)
+int delsvc_transport_message (void* data, void* whoData, void* messageData)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_who* whoto = (etch_who*)whoData;
etch_message* msg = (etch_message*)messageData;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_transportmessage* dstransport = tcpds->transport;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_transportmessage* dstransport = ds->transport;
ETCH_ASSERT(is_etch_transportmsg(dstransport));
return dstransport->transport_message (dstransport->thisx, whoto, msg);
}
-
-
/**
- * tcpdelsvc_init()
+ * delsvc_init()
* initialize delivery service interface
*/
-int tcpdelsvc_init (etch_tcp_delivery_service* delsvc)
+int delsvc_init (etch_delivery_service* delsvc)
{
i_session* isession = NULL;
i_transport* itransport = NULL;
@@ -269,8 +357,8 @@ int tcpdelsvc_init (etch_tcp_delivery_se
ids->transport = delsvc->transport;
ids->session = delsvc->session;
- ids->begin_call = delsvc->begin_call = (etch_delivsvc_begincall)tcpdelsvc_begincall;
- ids->end_call = delsvc->end_call = (etch_delvisvc_endcall)tcpdelsvc_endcall;
+ ids->begin_call = delsvc->begin_call = (etch_delivsvc_begincall)delsvc_begincall;
+ ids->end_call = delsvc->end_call = (etch_delvisvc_endcall)delsvc_endcall;
/* - - - - - - - - - - - - - - -
@@ -278,16 +366,16 @@ int tcpdelsvc_init (etch_tcp_delivery_se
* - - - - - - - - - - - - - - -
*/
itransport = new_transport_interface (ids,
- tcpdelsvc_transport_control,
- tcpdelsvc_transport_notify,
- tcpdelsvc_transport_query);
+ delsvc_transport_control,
+ delsvc_transport_notify,
+ delsvc_transport_query);
delsvc->transportmsg = new_transportmsg_interface (ids,
- tcpdelsvc_transport_message,
+ delsvc_transport_message,
itransport); /* transportmsg now owns itransport */
- delsvc->transportmsg->set_session = tcpdelsvc_set_session;
- delsvc->transportmsg->get_session = tcpdelsvc_get_session;
+ delsvc->transportmsg->set_session = delsvc_set_session;
+ delsvc->transportmsg->get_session = delsvc_get_session;
/* copy native transport back to interface */
ids->itm = delsvc->transportmsg;
@@ -306,12 +394,12 @@ int tcpdelsvc_init (etch_tcp_delivery_se
* - - - - - - - - - - - - - - -
*/
isession = new_session_interface (ids,
- tcpdelsvc_session_control,
- tcpdelsvc_session_notify,
- tcpdelsvc_session_query);
+ delsvc_session_control,
+ delsvc_session_notify,
+ delsvc_session_query);
delsvc->sessionmsg = new_sessionmsg_interface (ids,
- tcpdelsvc_session_message,
+ delsvc_session_message,
isession); /* sessionmsg now owns isession */
/* copy native session back to interface */
@@ -329,7 +417,6 @@ int tcpdelsvc_init (etch_tcp_delivery_se
return 0;
}
-
/**
* new_delivery_service_interface()
* delivery service interface constructor
@@ -467,7 +554,7 @@ int destroy_tcp_delivery_service (void*
/* on server side, listen thread destroys tcpconx on exit.
* on client side, tcpconx is destroyed here. */
- if (thisx->is_tcpconx_owned){
+ if (thisx->is_connection_owned){
etch_object_destroy(thisx->tcpconx);
thisx->tcpconx = NULL;
@@ -486,6 +573,54 @@ int destroy_tcp_delivery_service (void*
return destroy_objectex((etch_object*)thisx);
}
+/**
+ * destroy_udp_delivery_service()
+ * etch_udp_delivery_service destructor
+ */
+int destroy_udp_delivery_service (void* data)
+{
+ etch_udp_delivery_service* thisx = (etch_udp_delivery_service*)data;
+ const char* thistext = "delsvc dtor";
+ if (NULL == thisx) return -1;
+
+ if (!is_etchobj_static_content(thisx))
+ {
+ /* ensure any threads referencing mailboxes (see mailbox.message())
+ * have run to completion before we start tearing it down. */
+ etchmbox_get_readlockex (thisx->rwlock, thistext);
+ etchmbox_release_readlockex (thisx->rwlock, thistext);
+
+ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying messagizer ...\n");
+
+ etch_object_destroy(((etch_messagizer*)thisx->messagizer));
+ thisx->messagizer = NULL;
+
+ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying mailbox manager ...\n");
+
+ etch_object_destroy(((etch_mailbox_manager*)thisx->mailboxmgr));
+ thisx->mailboxmgr = NULL;
+
+ /* on server side, listen thread destroys udpconx on exit.
+ * on client side, udpconx is destroyed here. */
+ if (thisx->is_connection_owned){
+
+ etch_object_destroy(thisx->udpconx);
+ thisx->udpconx = NULL;
+ }
+
+ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying delivery interface ...\n");
+ destroy_delivery_service_interface(thisx->ids);
+
+ etch_object_destroy(thisx->sessionmsg);
+ thisx->sessionmsg = NULL;
+
+ etch_object_destroy(thisx->transportmsg);
+ thisx->transportmsg = NULL;
+
+ }
+ return destroy_objectex((etch_object*)thisx);
+}
+
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice (i_sessionmessage, i_transportmessage)
@@ -497,39 +632,39 @@ int destroy_tcp_delivery_service (void*
* convenience method to verify i_delivery_service, and from it,
* get, verify, and return the delivery service implementation object.
*/
-etch_tcp_delivery_service* get_etch_ds_impl (i_delivery_service* ids)
+etch_delivery_service* get_etch_ds_impl (i_delivery_service* ids)
{
- etch_tcp_delivery_service* tcpds = NULL;
+ etch_delivery_service* ds = NULL;
ETCH_ASSERT(is_etch_ideliverysvc(ids));
- tcpds = ids->thisx;
- ETCH_ASSERT(is_etch_deliverysvc(tcpds));
- return tcpds;
+
+ ds = (etch_delivery_service *) ids->thisx;
+ ETCH_ASSERT(is_etch_deliverysvc(ds));
+ return ds;
}
/**
- * tcpdelsvc_begincall()
+ * delsvc_begincall()
* i_deliveryservice :: begincall
* @param msg caller relinquishes on success, retains on failure
* @param out mailbox interface returned on success
* @return 0 success, or -1 failure. new mailbox return in out parameter.
*/
-int tcpdelsvc_begincall (i_delivery_service* ids, etch_message* msg, i_mailbox** out)
+int delsvc_begincall (i_delivery_service* ids, etch_message* msg, i_mailbox** out)
{
int result = 0;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_transportmessage* dstransport = tcpds->transport;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_transportmessage* dstransport = ds->transport;
ETCH_ASSERT(is_etch_transportmsg(dstransport));
/* transport is mailbox mgr pmboxmgr_transport_call(imbmgr) */
- result = tcpds->transportx->transport_call (tcpds->transportx, NULL, msg, out);
+ result = ds->transportx->transport_call (ds->transportx, NULL, msg, out);
return result;
}
-
/**
- * tcpdelsvc_endcall()
+ * delsvc_endcall()
* read the response message, close its mailbox and return the result object.
* @param mbox the current mailbox (interface), caller retains.
* @param response_type type of the response message, caller retains.
@@ -545,7 +680,7 @@ int tcpdelsvc_begincall (i_delivery_serv
* for example, if the service message is etch_int32* add(etch_int32*, etch_int32*),
* the result object will be an etch_int32 unless an exception occurred.
*/
-int tcpdelsvc_endcall (i_delivery_service* ids, i_mailbox* ibox, etch_type* response_type, etch_object** out)
+int delsvc_endcall (i_delivery_service* ids, i_mailbox* ibox, etch_type* response_type, etch_object** out)
{
int result = 0;
int timeout = 0;
@@ -553,7 +688,7 @@ int tcpdelsvc_endcall (i_delivery_servic
int32 default_timeout = 0;
etch_object* result_obj = NULL;
etch_mailbox_element* mbe = NULL;
- const char* thistext = "tcpdelsvc_endcall";
+ const char* thistext = "delsvc_endcall";
/* get the response message type's instance data */
etch_type_impl* typeinfo = response_type? (etch_type_impl*) response_type->impl: NULL;
ETCH_ASSERT(typeinfo && out);
@@ -643,7 +778,6 @@ int tcpdelsvc_endcall (i_delivery_servic
return result;
}
-
/* - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice :: i_sessionmessage (i_session)
* - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -655,58 +789,56 @@ int tcpdelsvc_endcall (i_delivery_servic
*/
/**
- * tcpdelsvc_session_message()
+ * delsvc_session_message()
* @param whofrom caller retains, can be null.
* @param msg caller relinquishes
* @return 0 (message handled), or -1 (error, closed, or timeout)
*/
-int tcpdelsvc_session_message (void* data, etch_who* whofrom, etch_message* msg)
+int delsvc_session_message (void* data, etch_who* whofrom, etch_message* msg)
{
i_delivery_service* ids = (i_delivery_service*)data;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_sessionmessage* dssession = tcpds->session;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_sessionmessage* dssession = ds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_message(dssession->thisx, whofrom, msg);
}
-
/**
- * tcpdelsvc_session_control()
+ * delsvc_session_control()
* delivery service interface implementation of i_session_message
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
-int tcpdelsvc_session_control (void* data, etch_event* control, etch_object* value)
+int delsvc_session_control (void* data, etch_event* control, etch_object* value)
{
- i_delivery_service* ids = (i_delivery_service*)data;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_sessionmessage* dssession = tcpds->session;
+ i_delivery_service* ids = (i_delivery_service*)data;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_sessionmessage* dssession = ds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_control(dssession->thisx, control, value);
}
-
/**
- * etch_tcpdelsvc_session_notify()
+ * delsvc_session_notify()
* @param evt event, caller relinquishes.
*/
-int tcpdelsvc_session_notify (void* data, etch_event* evt)
+int delsvc_session_notify (void* data, etch_event* evt)
{
- i_delivery_service* ids = (i_delivery_service*)data;
+ i_delivery_service* ids = (i_delivery_service*)data;
int result = -1, evtype = evt? evt->value: 0;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_sessionmessage* dssession = tcpds->session;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_sessionmessage* dssession = ds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
switch(evtype)
{
case ETCHEVT_SESSION_UP:
- etch_wait_set(tcpds->wait_up, evtype);
+ etch_wait_set(ds->wait_up, evtype);
break;
case ETCHEVT_SESSION_DOWN:
- etch_wait_set(tcpds->wait_down, evtype);
+ etch_wait_set(ds->wait_down, evtype);
break;
}
@@ -715,20 +847,19 @@ int tcpdelsvc_session_notify (void* data
}
/**
- * etch_tcpdelsvc_session_query()
+ * delsvc_session_query()
* @param query, caller relinquishes.
*/
-etch_object* tcpdelsvc_session_query (void* data, etch_query* query)
+etch_object* delsvc_session_query (void* data, etch_query* query)
{
i_delivery_service* ids = (i_delivery_service*)data;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_sessionmessage* dssession = tcpds->session;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_sessionmessage* dssession = ds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_query (dssession->thisx, query);
}
-
/* - - - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice :: i_transportmessage (i_transport)
* - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -736,23 +867,20 @@ etch_object* tcpdelsvc_session_query (vo
/**
- * tcpdelsvc_get_session()
+ * delsvc_get_session()
* @return a reference to the delivery service i_sessionmessage interface.
* caller does not own this object.
*/
-i_session* tcpdelsvc_get_session (void* data)
+i_session* delsvc_get_session (void* data)
{
i_delivery_service* ids = (i_delivery_service*)data;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
- return (i_session*)tcpds->session;
+ return (i_session*)ds->session;
}
-
-
-
/**
- * tcpdelsvc_transport_control()
+ * delsvc_transport_control()
* @param control, caller relinquishes.
* @param value control value, caller relinquishes.
* @remarks as it currently stands, the value object passed through these transport
@@ -761,22 +889,22 @@ i_session* tcpdelsvc_get_session (void*
* etch_string, etch_date, etch_event, etch_object, and others); or by virtue of
* having custom clone() functions assigned to them.
*/
-int tcpdelsvc_transport_control (void* data, etch_event* control, etch_object* valobj)
+int delsvc_transport_control (void* data, etch_event* control, etch_object* valobj)
{
i_delivery_service* ids = (i_delivery_service*)data;
int result = 0;
etch_connection* cx = NULL;
i_transportmessage* dstransport = NULL;
- etch_tcp_delivery_service* tcpds = NULL;
+ etch_delivery_service* ds = NULL;
const int objclass = control? ((etch_object*)control)->class_id: 0;
const int timeoutms = control && (is_etch_int32(valobj))? ((etch_int32*)control)->value: 0;
ETCH_ASSERT(is_etch_ideliverysvc(ids) && objclass);
- tcpds = get_etch_ds_impl(ids); /* delivery service implementation */
- ETCH_ASSERT(is_etch_deliverysvc(tcpds));
- dstransport = tcpds->transport; /* delivery service transport (mailbox mgr) */
+ ds = get_etch_ds_impl(ids); /* delivery service implementation */
+ ETCH_ASSERT(is_etch_deliverysvc(ds));
+ dstransport = ds->transport; /* delivery service transport (mailbox mgr) */
ETCH_ASSERT(is_etch_transportmsg(dstransport));
- cx = &tcpds->tcpconx->cx; /* underlying connection */
+ cx = &ds->conx->cx; /* underlying connection */
switch(objclass) /* forward the transport event */
{
@@ -826,26 +954,24 @@ int tcpdelsvc_transport_control (void* d
return result;
}
-
/**
- * tcpdelsvc_transport_notify()
+ * delsvc_transport_notify()
* @param evt, caller relinquishes.
*/
-int tcpdelsvc_transport_notify (void* data, etch_event* evt)
+int delsvc_transport_notify (void* data, etch_event* evt)
{
i_delivery_service* ids = (i_delivery_service*)data;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
- return tcpds->transport->transport_notify( tcpds->transport->thisx, evt);
+ return ds->transport->transport_notify( ds->transport->thisx, evt);
}
-
/**
- * tcpdelsvc_transport_query()
+ * delsvc_transport_query()
* i_transportmessage::transport_query override.
* @param query, caller relinquishes.
*/
-etch_object* tcpdelsvc_transport_query (void* data, etch_query* query)
+etch_object* delsvc_transport_query (void* data, etch_query* query)
{
i_delivery_service* ids = (i_delivery_service*)data;
int result = 0;
@@ -853,9 +979,9 @@ etch_object* tcpdelsvc_transport_query (
etch_connection* cx = NULL;
const int timeoutms = query? query->value: 0;
const int objclass = query? ((etch_object*)query)->class_id: 0;
- etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
- i_transportmessage* dstransport = tcpds->transport;
- cx = &tcpds->tcpconx->cx;
+ etch_delivery_service* ds = get_etch_ds_impl(ids);
+ i_transportmessage* dstransport = ds->transport;
+ cx = &ds->conx->cx;
switch(objclass)
{
@@ -877,7 +1003,6 @@ etch_object* tcpdelsvc_transport_query (
return resultobj;
}
-
/* - - - - - - - - - - - - - - - - - - - - - -
* etch_resources
* - - - - - - - - - - - - - - - - - - - - - -
@@ -1210,11 +1335,11 @@ etch_server_factory* new_server_factory
/*
- * tcpxfact_get_session()
+ * connection_fact_get_session()
* return session interface from the server factory bundle.
* validate and assert the i_sessionlistener object.
*/
-i_session* tcpxfact_get_session (i_sessionlistener* lxr)
+i_session* connection_fact_get_session (i_sessionlistener* lxr)
{
i_session* session = NULL;
etch_server_factory* factory = NULL;
@@ -1226,15 +1351,15 @@ i_session* tcpxfact_get_session (i_sessi
/*
- * tcpxfact_session_control()
+ * connection_fact_session_control()
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
-int tcpxfact_session_control (void* data, etch_event* control, etch_object* value)
+int connection_fact_session_control (void* data, etch_event* control, etch_object* value)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
int result = -1;
- i_session* session = tcpxfact_get_session (thisx);
+ i_session* session = connection_fact_get_session (thisx);
if (session && session->session_control)
result = session->session_control (session, control, value);
@@ -1249,14 +1374,14 @@ int tcpxfact_session_control (void* data
/*
- * tcpxfact_session_notify()
+ * connection_fact_session_notify()
* @param evt event, caller relinquishes.
*/
-int tcpxfact_session_notify (void* data, etch_event* evt)
+int connection_fact_session_notify (void* data, etch_event* evt)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
int result = -1;
- i_session* session = tcpxfact_get_session (thisx);
+ i_session* session = connection_fact_get_session (thisx);
if (session && session->session_notify)
result = session->session_notify (session, evt);
@@ -1268,14 +1393,14 @@ int tcpxfact_session_notify (void* data,
/*
- * tcpxfact_session_query()
+ * connection_fact_session_query()
* @param query caller relinquishes
*/
-etch_object* tcpxfact_session_query (void* data, etch_query* query)
+etch_object* connection_fact_session_query (void* data, etch_query* query)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
- void* resultobj = NULL;
- i_session* session = tcpxfact_get_session (thisx);
+ etch_object* resultobj = NULL;
+ i_session* session = connection_fact_get_session (thisx);
if (session && session->session_query)
resultobj = session->session_query (session, query);
@@ -1316,7 +1441,7 @@ int transport_session_count (i_sessionli
/*
- * tcpxfact_teardown_client_sessions()
+ * transport_teardown_client_sessions()
* signal and wait for each session thread to exit, destroying each
* thread, connection and session. tearing down the session destroys its
* delivery service, remote client, and stub. this is intended to be invoked
@@ -1367,7 +1492,7 @@ int etch_listener_waitfor_exit (i_sessio
/*
- * tcpxfact_session_accepted()
+ * connection_fact_session_accepted()
* override for transport factory session_accepted()
* signature is typedef int (*etch_session_accepted) (void* thisx, void* socket);
* parallels java TcpTransportFactory.newListener.newSessionListener.sessionAccepted
@@ -1376,19 +1501,18 @@ int etch_listener_waitfor_exit (i_sessio
* in practice this is an apr socket wrapped by etch_socket.
* @return 0 success, -1 failure.
*/
-int tcpxfact_session_accepted (void* data, void* connectionData)
+int connection_fact_session_accepted (void* data, void* connectionData)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
- etch_tcp_connection* tcpconx = (etch_tcp_connection*)connectionData;
+ etch_transport_connection* tconx = (etch_transport_connection*)connectionData;
int result = 0;
void* newstub = NULL;
etch_session* newsession = NULL;
etch_server_factory* params = NULL;
- etch_connection* cx = &tcpconx->cx;
+ etch_connection* cx = &tconx->cx;
i_delivery_service* delivery_service = NULL;
const int session_id = cx->conxid;
ETCH_ASSERT(is_etch_sessionlxr(thisx));
- ETCH_ASSERT(is_etch_tcpconnection(tcpconx));
params = (etch_server_factory*) thisx->server_params;
ETCH_ASSERT(params && params->helper_new_listener);
@@ -1402,7 +1526,7 @@ int tcpxfact_session_accepted (void* dat
/* instantiate delivery service */
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "creating delivery service ...\n");
- delivery_service = new_etch_transport_a (thisx->url, thisx->server_params, tcpconx);
+ delivery_service = new_etch_transport_a (thisx->url, thisx->server_params, tconx);
if (NULL == delivery_service)
{ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "could not create delivery service\n");
@@ -1415,7 +1539,7 @@ int tcpxfact_session_accepted (void* dat
newsession = new_etch_clientsession (params, cx);
newsession->mainlistener = thisx; /* session points back to accept listener */
newsession->ds = delivery_service;
- newsession->conximpl = (etch_object*) tcpconx;
+ newsession->conximpl = (etch_object*) tconx;
/* CALL BACK to helper.xxx_helper_listener_create to create this
* client's server side listener, server implementation, and stub.
@@ -1467,8 +1591,7 @@ int destroy_etch_listener (void* data)
* have mutual references. we must ensure that if we are to
* destroy the etch_tcp_server via the i_sessionlistener, that the
* etch_tcp_server does not also destroy the i_sessionlistener. */
- etch_tcp_server* srvobj = (etch_tcp_server*) thisx->thisx;
- ETCH_ASSERT(is_etch_tcpserver(srvobj));
+ etch_object* srvobj = (etch_object*) thisx->thisx;
etch_object_destroy(srvobj);
}
@@ -1502,24 +1625,60 @@ i_sessionlistener* new_etch_listener (wc
main_server_create_func main_server_create,
helper_resources_init_func helper_resources_init)
{
- etch_tcp_server* tcp_server = NULL;
+ etch_url* url = new_url(uri);
+
+ i_sessionlistener* newsl = NULL;
+
+ if (is_url_scheme_udp(url))
+ {
+ newsl = new_etch_listener_a (url, resx, factory_thisx, helper_listener_create, main_server_create, helper_resources_init, (new_server_func)new_udp_server);
+ }
+ #if(0)
+ else /* handlers for other url schemes follow here eventually */
+ if (is_url_scheme_foo(url))
+ {
+ /* ... */
+ }
+ #endif
+ else
+ { /* url schemes http, tcp, default */
+ newsl = new_etch_listener_a (url, resx, factory_thisx, helper_listener_create, main_server_create, helper_resources_init, (new_server_func)new_tcp_server);
+ }
+
+ return newsl;
+}
+
+
+/*
+ * new_etch_listener_a()
+ * constructs a new transport listener used to construct server sessions.
+ * returns a transport interface, whereas c binding will instead extract the
+ * transport interface from i_sessionlistener.itransport.
+ */
+i_sessionlistener* new_etch_listener_a (etch_url* url, etch_resources* resx,
+ void* factory_thisx,
+ helper_listener_create_func helper_listener_create,
+ main_server_create_func main_server_create,
+ helper_resources_init_func helper_resources_init,
+ new_server_func new_server_create)
+{
+ etch_server* server = NULL;
etch_server_factory* params = NULL;
- etch_url* url = new_url(uri);
/* listener assumes the session interface of the server factory creator.
* this accomplishes the same thing as the session method implementations
* found in java TcpTransportFactory.newListener().
*/
i_session* isession = new_session_interface (NULL,
- tcpxfact_session_control,
- tcpxfact_session_notify,
- tcpxfact_session_query);
+ connection_fact_session_control,
+ connection_fact_session_notify,
+ connection_fact_session_query);
/* create the listener interface, specifying the on_session_accepted
* callback to be invoked on each successful server accept in order
* to create a new server. relinquish isession to listener here. */
i_sessionlistener* listener = new_sessionlistener_interface (NULL,
- tcpxfact_session_accepted, isession);
+ connection_fact_session_accepted, isession);
((etch_object*)listener)->destroy = destroy_etch_listener;
listener->wait_exit = etch_listener_waitfor_exit;
@@ -1534,43 +1693,42 @@ i_sessionlistener* new_etch_listener (wc
listener->is_resources_owned = TRUE;
listener->resources = get_etch_transport_resources (resx); /* resx null */
params->in_resx = listener->resources;
- params->in_uri = uri;
+ params->in_uri = url->raw;
helper_resources_init(params);
listener->server_params = params;
/* fyi params delivery service is set later, in svr->on_session_accepted(),
- whose implementation is tcpxfact_session_accepted(), in this module */
+ whose implementation is connection_fact_session_accepted(), in this module */
/* create the tcp connection and acceptor SVR BREAK 001 */
- tcp_server = new_tcp_server (url, params->mainpool, params->subpool, resx, listener);
+ server = new_server_create (url, params->mainpool, params->subpool, resx, listener);
- if (NULL == tcp_server) {
+ if (NULL == server) {
etch_object_destroy(listener);
return NULL;
}
/* listener [main] expects that i_sessionlistener.thisx is the server,
* e.g. an etch_tcp_server* */
- listener->thisx = tcp_server;
+ listener->thisx = server;
/* copy server object's session virtuals to this object */
/* see java TcpTransportFactory.newListener() for session impls */
- listener->session = tcp_server->session;
- listener->isession = tcp_server->isession;
- listener->session_control = tcp_server->session_control;
- listener->session_notify = tcp_server->session_notify;
- listener->session_query = tcp_server->session_query;
+ listener->session = server->session;
+ listener->isession = server->isession;
+ listener->session_control = server->session_control;
+ listener->session_notify = server->session_notify;
+ listener->session_query = server->session_query;
/* set this listener object's transport to be the server connection's transport */
- ETCH_ASSERT(tcp_server->itransport);
+ ETCH_ASSERT(server->itransport);
etch_free(listener->itransport); /* TODO don't instantiate in the first place */
- listener->itransport = tcp_server->itransport;
- listener->transport_control = tcp_server->transport_control;
- listener->transport_notify = tcp_server->transport_notify;
- listener->transport_query = tcp_server->transport_query;
- listener->set_session = tcp_server->set_session;
- listener->get_session = tcp_server->get_session;
+ listener->itransport = server->itransport;
+ listener->transport_control = server->transport_control;
+ listener->transport_notify = server->transport_notify;
+ listener->transport_query = server->transport_query;
+ listener->set_session = server->set_session;
+ listener->get_session = server->get_session;
listener->is_transport_owned = FALSE;
return listener; /* caller owns this object */
}
-
Copied: incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_udp_connection.c (from r1149205, incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_tcp_connection.c)
URL: http://svn.apache.org/viewvc/incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_udp_connection.c?p2=incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_udp_connection.c&p1=incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_tcp_connection.c&r1=1149205&r2=1149207&rev=1149207&view=diff
==============================================================================
--- incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_tcp_connection.c (original)
+++ incubator/etch/branches/singlestack/binding-c/runtime/c/src/main/transport/etch_udp_connection.c Thu Jul 21 14:56:59 2011
@@ -17,182 +17,118 @@
*/
/*
- * etch_tcpconxn.c
- * tcp connection class
+ * etch_udpconxn.c
+ * udp connection class
*/
#include "etch_thread.h"
-#include "etch_tcp_connection.h"
+#include "etch_udp_connection.h"
#include "etch_encoding.h"
#include "etch_flexbuffer.h"
#include "etch_log.h"
#include "etch_objecttypes.h"
+#include "etch_connection_event.h"
+#include "etch_inet_who.h"
-static const char* LOG_CATEGORY = "etch_tcp_connection";
+static const char* LOG_CATEGORY = "etch_udp_connection";
// extern types
extern apr_pool_t* g_etch_main_pool;
extern apr_thread_mutex_t* g_etch_main_pool_mutex;
-int etch_tcpconx_closex(etch_tcp_connection*, const int, const int);
-i_session* etch_tcpclient_get_session (void*);
-
-
-//extern char* ETCH_CONNECTION_LOGID;
-#define ETCH_SHUTDOWNSIGNALSIZE (sizeof(ETCH_SHUTDOWNSIGNAL)-1)
-
-unsigned connection_id_farm;
-
-#if(0)
-
- TCPCONNECTION
- | Socket, hostIP, port, delay, isKeepalive, isNoDelay
- | buffersize, isAutoflush, trafficclass
- | InputStream, OutputStream
- | stop0(); openSocket(); setupSocket(); readSocket();
- | close(); send(); flush(); shutdownInput(); shutdownOutput();
- | remoteAddress(); fireData(); transportData();
- |
- - CONNECTION<SESSIONDATA>
- | | Monitor status;
- | | Connection(); started(); stopped(); exception();
- | | run0(); localAddress(); translateHost();
- | | openSocket(); setupSocket(); readSocket(); close();
- | | transportQuery(); transportControl(); transportNotify();
- | | fireUp(); fireDown();
- | | void* getSession(); setSession(void*); waitUp(); waitDown();
- | |
- | - SESSION
- | | sessionQuery(); sessionControl(); sessionNotify();
- | |
- | - RUNNER
- | | | Thread thread;
- | | | RunnerHandler handler;
- | | | start0()
- | | | stop0()
- | | | run()
- | | | run0()
- | | | fireStarted()
- | | | fireStopped()
- | | | fireException()
- | | - ABSTRACTSTARTABLE
- | |
- | - TRANSPORT<SESSIONDATA>
- | | transportQuery(); transportControl(); transportNotify();
- | |
- | - RUNNERHANDLER interface
- | started(); stopped(); exception();
- |
- - TRANSPORTDATA
- | int transportData(to, buffer);
- | int headerSize;
- - TRANSPORT
- transportQuery(); transportControl(); transportNotify();
-#endif
-
-
-
+/*
+ * is_good_tcp_params()
+ */
+int is_good_udp_params(etch_url* url, void* resources, etch_rawsocket* socket)
+{
+ return is_good_conn_params(url, resources, socket);
+}
/**
- * etch_tcpconx_set_socket_options()
+ * etch_udpconx_set_socket_options()
*/
-int etch_tcpconx_set_socket_options(void* data)
+int etch_udpconx_set_socket_options(void* data)
{
- etch_tcp_connection *c = (etch_tcp_connection*)data;
- int arc = 0, ecount = 0;
- etch_connection_event_handler eventx;
- etch_rawsocket* socket = c? c->cx.socket: NULL;
+ etch_udp_connection *c = (etch_udp_connection*)data;
+ int arc = 0, ecount = 0;
+ etch_connection_event_handler eventx;
+ etch_rawsocket* socket = c? c->cx.socket: NULL;
if (!socket) return -1;
- eventx = c->cx.on_event;
+ eventx = c->cx.on_event;
- /*
- * APR_SO_DEBUG - turn on debugging information
- * APR_SO_KEEPALIVE - keep connections active
- * APR_SO_LINGER - lingers on close if data is present
- * APR_SO_NONBLOCK - turns blocking on/off for socket
- * when this option is enabled, use the APR_STATUS_IS_EAGAIN() macro
- * to determine if a send or receive function could not transfer data
- * without blocking.
- * APR_SO_REUSEADDR - the rules used in validating addresses
- * supplied to bind should allow reuse of local addresses.
- * APR_SO_SNDBUF - set the send buffer size
- * APR_SO_RCVBUF - set the receive buffer size
- */
-
- if (0 != (arc = apr_socket_opt_set(socket, APR_SO_KEEPALIVE, c->is_keepalive)))
- ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "keepalive");
-
- if (0 != (arc = apr_socket_opt_set(socket, APR_SO_LINGER, c->linger)))
- ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "linger");
-
- if (0 != (arc = apr_socket_opt_set(socket, APR_TCP_NODELAY, c->is_nodelay)))
- ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "nodelay");
+ if (0 != (arc = apr_socket_opt_set(socket, APR_SO_REUSEADDR, c->is_reuseaddr)))
+ ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "reuseaddr");
- /*
- if (0 != (arc = apr_socket_opt_set(socket, APR_SO_NONBLOCK, FALSE)))
- ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "do not block");
- */
-
- /* timeout < 0 = block, 0 = never block, > 0 = block until timeout
- if (0 != (arc = apr_socket_timeout_set(socket, -1)))
- ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "socket timeout");
- */
+#if defined(APR_SO_BROADCAST)
+ if (0 != (arc = apr_socket_opt_set(socket, APR_SO_BROADCAST, c->is_broadcast)))
+ ecount += eventx(c, ETCH_CONXEVT_SOCKOPTERR, arc, "broadcast");
+#else
+ if (c->is_broadcast)
+ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_WARN, "Your APR version doesn't support setting SO_BROADCAST flag.\n");
+#endif
- return ecount == 0? 0: -1;
+ return ecount == 0 ? 0: -1;
}
/**
- * etch_tcpclient_on_data()
- * tcp socket received data handler.
+ * etch_udpclient_on_data()
+ * udp socket received data handler.
* @param cx the connection object.
* @param unused parameter not currently used.
* @param length number of bytes in the supplied data buffer.
* @param data the data as received via the socket wrapped in a flexbuffer.
* caller retains this memory.
- * @remarks todo: if this remains the same as etch_tcpsvr_on_data, replace both
- * methods with a etch_tcpconx_on_data() containing the same code.
+ * @remarks todo: if this remains the same as etch_udpsvr_on_data, replace both
+ * methods with a etch_udpconx_on_data() containing the same code.
*/
-int etch_tcpclient_on_data (void* thisData, const int unused, int length, void* bufferData)
+int etch_udpclient_on_data (void* thisData, const int unused, int length, void* bufferData)
{
etch_connection* cx = (etch_connection*)thisData;
etch_flexbuffer* data = (etch_flexbuffer*)bufferData;
int result = 0;
- i_sessiondata* session = NULL;
- etch_tcp_connection* tcpx = cx? (etch_tcp_connection*) cx->owner: NULL;
- ETCH_ASSERT(is_etch_tcpconnection(tcpx));
+ i_sessionpacket* session = NULL;
+ etch_inet_who *whofrom = NULL;
+ apr_sockaddr_t *sockaddr = NULL;
+ etch_udp_connection* udpx = cx? (etch_udp_connection*) cx->owner: NULL;
+
+ ETCH_ASSERT(is_etch_udpconnection(udpx));
ETCH_ASSERT(is_etch_flexbuffer(data));
- session = tcpx->session;
+
+ session = udpx->session;
+ whofrom = new_inet_who(udpx->remote_addr);
- /* send the data up the chain to be packetized. note that tcpx->session->thisx
- * is the owner of the i_sessiondata* session, which is the next higher layer
+ /* send the data up the chain to be packetized. note that udpx->session->thisx
+ * is the owner of the i_sessionpacket* session, which is the next higher layer
* of the transport stack, which is ordinarily the packetizer.
*/
- if (-1 == (result = session->session_data (session->thisx, NULL, data)))
+ if (-1 == (result = session->session_packet(session->thisx, whofrom, data)))
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_ERROR, "current %d bytes on connxn %d rejected\n", length, cx->conxid);
return result;
}
/*
- * new_tcp_connection()
- * etch_tcp_connection tcp client constructor
+ * new_udp_connection()
+ * etch_udp_connection udp client constructor
*/
-etch_tcp_connection* new_tcp_connection(etch_url* url, void* resources, etch_rawsocket* socket)
+etch_udp_connection* new_udp_connection(etch_url* url, void* resources, etch_rawsocket* socket)
{
int result = -1, item = 0;
- etch_tcp_connection* newcon = NULL;
- if (!is_good_tcp_params (url, resources, socket)) return NULL;
+ etch_udp_connection* newcon = NULL;
+ if (!is_good_udp_params (url, resources, socket)) return NULL;
- newcon = (etch_tcp_connection*)new_object (sizeof(etch_tcp_connection), ETCHTYPEB_CONNECTION, CLASSID_TCP_CONNECTION);
- ((etch_object*)newcon)->destroy = destroy_etch_tcp_connection;
+ newcon = (etch_udp_connection*)new_object (sizeof(etch_udp_connection), ETCHTYPEB_CONNECTION, CLASSID_UDP_CONNECTION);
+ ((etch_object*)newcon)->destroy = destroy_etch_udp_connection;
+ newcon->is_server = resources == NULL;
+ newcon->remote_addr = NULL;
do /* populate connection's transport and session interfaces */
- { if (-1 == (result = init_etch_tcpconx_interfaces(newcon))) break;
+ { if (-1 == (result = init_etch_udpconx_interfaces(newcon))) break;
if (-1 == (result = etch_init_connection (&newcon->cx, socket, newcon))) break;
- newcon->cx.set_socket_options = etch_tcpconx_set_socket_options;
- newcon->cx.on_event = etch_tcpconx_on_event; /* connection state handler */
+ newcon->cx.set_socket_options = etch_udpconx_set_socket_options;
+ newcon->cx.on_event = etch_def_connection_on_event; /* connection state handler */
if (socket)
newcon->cx.socket = socket;
@@ -201,74 +137,78 @@ etch_tcp_connection* new_tcp_connection(
// TODO: pool
etch_encoding_transcode_wchar(&newcon->cx.hostname, ETCH_ENCODING_UTF8, url->host, NULL);
newcon->cx.port = url->port;
- etchurl_get_integer_term (url, ETCH_CONNECTION_RECONDELAY, &newcon->cx.delay);
}
/* set term default values - values for any terms not set here are zero */
- newcon->is_nodelay = ETCH_CONNECTION_DEFNODELAY;
- newcon->linger = ETCH_CONNECTION_DEFLINGERTIME;
/* set any terms which may have been supplied with URL */
- etchurl_get_boolean_term(url, ETCH_CONNECTION_AUTOFLUSH, &newcon->is_autoflush);
- etchurl_get_boolean_term(url, ETCH_CONNECTION_KEEPALIVE, &newcon->is_keepalive);
- etchurl_get_boolean_term(url, ETCH_CONNECTION_NODELAY, &newcon->is_nodelay);
- etchurl_get_integer_term(url, ETCH_CONNECTION_LINGERTIME, &newcon->linger);
- etchurl_get_integer_term(url, ETCH_CONNECTION_TRAFCLASS, &newcon->traffic_class);
- etchurl_get_integer_term(url, ETCH_CONNECTION_BUFSIZE, &item);
- if (item > 0) newcon->cx.bufsize = item;
+ etchurl_get_boolean_term(url, ETCH_CONNECTION_REUSE_PORT, &newcon->is_reuseaddr);
+ etchurl_get_boolean_term(url, ETCH_CONNECTION_BROADCAST, &newcon->is_broadcast);
+
result = 0;
} while(0);
- newcon->cx.on_data = etch_tcpclient_on_data;
+ newcon->cx.on_data = etch_udpclient_on_data;
newcon->cx.on_event(newcon, result? ETCH_CONXEVT_CREATERR: ETCH_CONXEVT_CREATED, 0, 0);
if (-1 == result)
- { destroy_etch_tcp_connection(newcon);
+ { destroy_etch_udp_connection(newcon);
return NULL;
}
else return newcon;
}
/**
- * etch_tcpclient_set_session()
+ * etch_client_get_session
+ * i_transport::get_session implementation
+ */
+i_session* etch_udpclient_get_session (void* data)
+{
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
+ return (i_session*)thisx->session;
+}
+
+/**
+ * etch_udpclient_set_session()
* i_transport::set_session() override
- * @param session an i_sessiondata*. caller retains this object.
+ * @param session an i_sessionpacket*. caller retains this object.
*/
-void etch_tcpclient_set_session (void* data, void* newsession)
+void etch_udpclient_set_session (void* data, void* newsession)
{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
- ETCH_ASSERT(is_etch_sessiondata(newsession));
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
+ ETCH_ASSERT(is_etch_sessionpacket(newsession));
if (thisx->is_session_owned){
etch_object_destroy(thisx->session);
thisx->session = NULL;
}
thisx->is_session_owned = FALSE;
- thisx->session = newsession;
+ thisx->session = (i_sessionpacket*)newsession;
}
/**
- * etch_tcpconx_transport_control()
+ * etch_udpconx_transport_control()
* connection::i_transport::transport_control override.
* this is the base connection class' implementation of i_transport.
* this is java binding's Connection.transportControl(), and serves as the
- * Transport part of the java TcpConnection TransportData.
- * while tcp connection does implement i_transportdata, tcp connection's
+ * Transport part of the java UdpConnection TransportData.
+ * while udp connection does implement i_transportdata, udp connection's
* implementation of i_transport comes from its inheritance of connection,
* and its implementation of TransportData. since we do not separately implement
* the connection class, the i_transport methods are implemented here.
* @param control the event, sender relinquishes.
* @param value control value, sender relinquishes.
*/
-int etch_tcpconx_transport_control (void* data, etch_event* control, etch_object* value)
+int etch_udpconx_transport_control (void* data, etch_event* control, etch_object* value)
{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
etch_connection* cx = NULL;
int result = 0, timeoutms = 0;
const int objclass = control? ((etch_object*)control)->class_id: 0;
const int is_client = is_etch_int32(value)? ((etch_int32*)value)->value: NULL;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
cx = &thisx->cx;
@@ -276,10 +216,10 @@ int etch_tcpconx_transport_control (void
{
case CLASSID_CONTROL_START:
- result = etch_tcpconx_start (thisx);
+ result = etch_udpconx_start (thisx);
- if (is_client && 0 == result)
- result = etch_tcpclient_start_listener (thisx);
+ if (0 == result)
+ result = etch_udpclient_start_listener (thisx);
break;
case CLASSID_CONTROL_START_WAITUP:
@@ -287,14 +227,14 @@ int etch_tcpconx_transport_control (void
/* open the connection, and wait for completion. caller blocks by virtue
* of the fact that this is of course a function call, not a message handler.
* timeout is communicated to caller via result code 1 = ETCH_TIMEOUT.
- * it is not clear why wait up is implemented here. since a tcp server
+ * it is not clear why wait up is implemented here. since a udp server
* implements transport interface itself, a server will never invoke this
* implementation. on the other hand, the requester of a client connection
* and the socket itself are the same thread, wait up therefore being
* meaningless since the socket open is known to be complete prior to
* invoking wait up.
*/
- if (0 == (result = etch_tcpconx_open (thisx, ETCH_CONX_NOT_RECONNECTING)))
+ if (0 == (result = etch_udpconx_open (thisx, ETCH_CONX_NOT_RECONNECTING)))
{ timeoutms = value? ((etch_int32*) value)->value: 0;
result = etchconx_wait_up (cx, timeoutms);
}
@@ -303,15 +243,15 @@ int etch_tcpconx_transport_control (void
case CLASSID_CONTROL_STOP:
if (is_client)
- result = etch_tcpclient_stop_listener (thisx);
+ result = etch_udpclient_stop_listener (thisx);
else
- result = etch_tcpconx_close (thisx, ETCH_CONX_NO_LINGER);
+ result = etch_udpconx_close (thisx);
break;
case CLASSID_CONTROL_STOP_WAITDOWN:
/* see comments above at CLASSID_CONTROL_START_WAITUP */
- if (0 == (result = etch_tcpconx_close (thisx, ETCH_CONX_NO_LINGER)))
+ if (0 == (result = etch_udpconx_close (thisx)))
{ timeoutms = value? ((etch_int32*) value)->value: 0;
result = etchconx_wait_down (cx, timeoutms);
}
@@ -324,14 +264,14 @@ int etch_tcpconx_transport_control (void
}
/**
- * etch_tcpconx_transport_notify()
+ * etch_udpconx_transport_notify()
* i_transport::transport_notify override.
* @param evt, caller relinquishes.
*/
-int etch_tcpconx_transport_notify (void* data, etch_event* evt)
+int etch_udpconx_transport_notify (void* data, etch_event* evt)
{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
etch_object_destroy(evt);
return 0; /* nothing to do */
}
@@ -339,19 +279,19 @@ int etch_tcpconx_transport_notify (void*
/**
- * etch_tcpconx_transport_query()
+ * etch_udpconx_transport_query()
* i_transport::transport_query override.
* @param query, caller relinquishes.
*/
-etch_object* etch_tcpconx_transport_query (void* data, etch_query* query)
+etch_object* etch_udpconx_transport_query (void* data, etch_query* query)
{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
int result = 0;
etch_object* resultobj = NULL;
etch_connection* cx = NULL;
const int timeoutms = query? query->value: 0;
const int objclass = query? ((etch_object*)query)->class_id: 0;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
cx = &thisx->cx;
switch(objclass)
@@ -379,10 +319,10 @@ etch_object* etch_tcpconx_transport_quer
/*
- * etch_tcpclient_sendex()
+ * etch_udpclient_sendex()
* send data with specified timeout
*/
-int etch_tcpclient_sendex (etch_tcp_connection *conx, unsigned char* buf,
+int etch_udpclient_sendex (etch_udp_connection *conx, etch_who* whoto, unsigned char* buf,
const size_t totallen, const int timeout_ms, int* rc)
{
int arc = 0, is_eod = 0;
@@ -404,7 +344,11 @@ int etch_tcpclient_sendex (etch_tcp_conn
{
datalen = totallen;
- is_eod = (APR_EOF == (arc = apr_socket_send(socket, (char*)(buf + totalsent), &datalen)));
+ if (is_etch_inet_who(whoto))
+ is_eod = (APR_EOF == (arc = apr_socket_sendto(socket,
+ inet_who_sockaddr((etch_inet_who*)whoto), 0, (char*)(buf + totalsent), &datalen)));
+ else
+ is_eod = (APR_EOF == (arc = apr_socket_send(socket, (char*)(buf + totalsent), &datalen)));
totalsent += datalen; remaining -= datalen;
@@ -425,86 +369,87 @@ int etch_tcpclient_sendex (etch_tcp_conn
}
/*
- * etch_tcpclient_send()
+ * etch_udpclient_send()
*/
-int etch_tcpclient_send (etch_tcp_connection *conx, unsigned char* buf, const size_t totallen, int* rc)
+int etch_udpclient_send (etch_udp_connection *conx, etch_who* whoto, unsigned char* buf, const size_t totallen, int* rc)
{
- return etch_tcpclient_sendex(conx, buf, totallen, 0, rc);
+ return etch_udpclient_sendex(conx, whoto, buf, totallen, 0, rc);
}
/*
- * etch_tcpconx_transport_data()
- * etch_tcp_connection::i_transportdata::transport_data
+ * etch_udpconx_transport_packet()
+ * etch_udp_connection::i_transportpacket::transport_packet
* @param whoto caller retains
* @param fbuf caller retains
*/
-int etch_tcpconx_transport_data (void* data, etch_who* whoto, etch_flexbuffer* fbuf)
+int etch_udpconx_transport_packet (void* data, etch_who* whoto, etch_flexbuffer* fbuf)
{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
+ etch_udp_connection* thisx = (etch_udp_connection*)data;
int result = 0, apr_rc = 0;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
+ ETCH_ASSERT(is_etch_udpconnection(thisx));
- result = etch_tcpclient_send (thisx, fbuf->buf, fbuf->datalen, &apr_rc);
+ result = etch_udpclient_send (thisx, whoto, fbuf->buf, fbuf->datalen, &apr_rc);
etch_flexbuf_reset(fbuf);
return result;
}
/*
- * init_etch_tcpcon_interfaces()
- * populate transport and placeholder session interfaces to tcp connection.
+ * init_etch_udpcon_interfaces()
+ * populate transport and placeholder session interfaces to udp connection.
*/
-int init_etch_tcpconx_interfaces (etch_tcp_connection* tcpx)
+int init_etch_udpconx_interfaces (etch_udp_connection* udpx)
{
i_transport* itransport = NULL;
- ETCH_ASSERT(is_etch_tcpconnection(tcpx));
- if (tcpx->itd) return 0; /* already initialized */
-
- itransport = new_transport_interface_ex (tcpx,
- etch_tcpconx_transport_control,
- etch_tcpconx_transport_notify,
- etch_tcpconx_transport_query,
- etch_tcpclient_get_session,
- etch_tcpclient_set_session);
+ ETCH_ASSERT(is_etch_udpconnection(udpx));
+ if (udpx->itp) return 0; /* already initialized */
- tcpx->itd = new_transportdata_interface (tcpx,
- (void*)etch_tcpconx_transport_data, itransport); /* itd now owns itransport */
+ itransport = new_transport_interface_ex (udpx,
+ etch_udpconx_transport_control,
+ etch_udpconx_transport_notify,
+ etch_udpconx_transport_query,
+ etch_udpclient_get_session,
+ etch_udpclient_set_session);
+
+ udpx->itp = new_transportpkt_interface(udpx,
+ (etch_transport_packet)etch_udpconx_transport_packet, itransport); /* itp now owns itransport */
+ udpx->itp->header_size = 0;
/* establish placeholder session interface which is expected
- * to be replaced by the connection host (e.g. packetizer) */
- tcpx->session = new_sessiondata_interface (tcpx, NULL, NULL);
- tcpx->is_session_owned = TRUE;
+ * to be replaced by the connection host (e.g. messagizer) */
+ udpx->session = new_sessionpkt_interface (udpx, NULL, NULL);
+ udpx->is_session_owned = TRUE;
return 0;
}
/**
- * etch_tcpconx_start()
+ * etch_udpconx_start()
* start means open. generally we would come through here with an accepted socket,
* in which case it is currently marked already open and we will return success.
* @return 0 success, -1 failure.
*/
-int etch_tcpconx_start (etch_tcp_connection *conx)
+int etch_udpconx_start (etch_udp_connection *conx)
{
etch_connection* cx = conx? &conx->cx: NULL;
ETCH_ASSERT(cx);
cx->on_event (conx, ETCH_CONXEVT_STARTING, 0, 0);
if (cx->is_started) return 0;
- return etch_tcpconx_open (conx, ETCH_CONX_NOT_RECONNECTING);
+ return etch_udpconx_open (conx, ETCH_CONX_NOT_RECONNECTING);
}
/**
- * etch_tcpconx_open()
+ * etch_udpconx_open()
* open connection to server based on host name/port set at construction.
* @note we have omitted reconnect logic for now, pending logic to detect
* listen socket down and initiate reconnect.
* @return 0 success, -1 failure (already open, hostname or socket error, etc)
*/
-int etch_tcpconx_open(etch_tcp_connection *conx, const int is_reconnect)
+int etch_udpconx_open(etch_udp_connection *conx, const int is_reconnect)
{
int result = -1, arc = 0, attempt = 0, is_already_open = TRUE;
apr_status_t apr_status;
@@ -518,6 +463,10 @@ int etch_tcpconx_open(etch_tcp_connectio
is_already_open = FALSE;
apr_thread_mutex_lock(g_etch_main_pool_mutex);
+ if (conx->remote_addr == NULL) {
+ conx->remote_addr = (apr_sockaddr_t *)apr_pcalloc(g_etch_main_pool, sizeof(apr_sockaddr_t));
+ conx->remote_addr->pool = g_etch_main_pool;
+ }
arc = apr_sockaddr_info_get(&cx->sockdata, cx->hostname, ETCH_DEFAULT_SOCKET_FAMILY, cx->port, 0, g_etch_main_pool);
apr_thread_mutex_unlock(g_etch_main_pool_mutex);
if (0 != arc) {
@@ -527,12 +476,13 @@ int etch_tcpconx_open(etch_tcp_connectio
if (!cx->socket)
{
- if (0 != (arc = new_tcpsocket (&cx->socket, cx->aprpool)))
+ if (0 != (arc = new_udpsocket (&cx->socket, cx->aprpool)))
{ eventx(conx, ETCH_CONXEVT_OPENERR, 3, (void*)(size_t)arc);
break;
}
/* set socket options here: NONBLOCK, TIMEOUT */
+ cx->set_socket_options(conx);
}
apr_status = apr_socket_timeout_get(cx->socket, &apr_timeout);
@@ -548,7 +498,17 @@ int etch_tcpconx_open(etch_tcp_connectio
apr_strerror(apr_status, buffer, sizeof(buffer));
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_ERROR, "could not set socket options: %s\n", buffer);
}
-
+
+ if (conx->is_server) {
+ apr_status = apr_socket_bind (cx->socket, cx->sockdata);
+ if(apr_status != APR_SUCCESS){
+ char buffer[1024];
+ apr_strerror(apr_status, buffer, sizeof(buffer));
+ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_ERROR, "could not bind server: %s\n", buffer);
+ } else {
+ cx->is_started = TRUE;
+ }
+ } else
while(attempt++ < ETCH_CONNECTION_DEFRETRYATTEMPTS+1)
{ /* possibly todo: configure number of retry attempts */
/* open socket */
@@ -566,7 +526,7 @@ int etch_tcpconx_open(etch_tcp_connectio
cx->on_event(conx, ETCH_CONXEVT_OPENERR, 2, (void*)(size_t)arc);
etch_sleep(ETCH_CONNECTION_DEFRETRYDELAYMS);
}
-
+
apr_status = apr_socket_timeout_set(cx->socket, apr_timeout);
if(apr_status != APR_SUCCESS){
char buffer[1024];
@@ -575,7 +535,7 @@ int etch_tcpconx_open(etch_tcp_connectio
}
} while(0);
-
+
if (cx->is_started && !is_already_open) result = 0;
eventx(conx, result? ETCH_CONXEVT_OPENERR: ETCH_CONXEVT_OPENED, 0, 0);
@@ -606,23 +566,12 @@ int etch_tcpconx_open(etch_tcp_connectio
}
/*
- * etch_tcpcconx_close()
- * close tcp connection
- */
-int etch_tcpconx_close(etch_tcp_connection* conx, const int is_linger)
-{
- return etch_tcpconx_closex(conx, is_linger, FALSE);
-}
-
-
-/*
- * etch_tcpcconx_closex()
- * close tcp connection
- * @param is_linger whether to set the socket to linger.
- * @param is_dtor true only if this call is from the tcp connection destructor.
+ * etch_udpcconx_closex()
+ * close udp connection
+ * @param is_dtor true only if this call is from the udp connection destructor.
* @return 0 success, -1 failure.
*/
-int etch_tcpconx_closex(etch_tcp_connection* conx, const int is_linger, const int is_dtor)
+int etch_udpconx_closex(etch_udp_connection* conx, const int is_dtor)
{
etch_status_t status = ETCH_SUCCESS;
int result = 0, arc = 0, is_locked = 0, is_teardown = 0, is_logged = 0;
@@ -655,18 +604,18 @@ int etch_tcpconx_closex(etch_tcp_connect
is_locked = TRUE;
cx->is_closing = TRUE;
cx->is_started = FALSE;
-
- if (NULL != cx->socket && is_linger)
- apr_socket_opt_set(cx->socket, APR_SO_LINGER, conx->linger);
- if (NULL != cx->socket && 0 != (arc = apr_socket_shutdown(cx->socket,APR_SHUTDOWN_READWRITE))) {
- cx->on_event(conx, ETCH_CONXEVT_CLOSERR, 3, (void*)(size_t)arc);
- result = 0;
+ if (NULL != cx->socket) {
+ apr_socket_shutdown(cx->socket, APR_SHUTDOWN_READWRITE);
+ if (0 != (arc = apr_socket_close(cx->socket))) {
+ cx->on_event(conx, ETCH_CONXEVT_CLOSERR, 3, (void*)(size_t)arc);
+ result = -1;
+ }
}
- if (NULL != cx->socket && 0 != (arc = apr_socket_close(cx->socket))) {
- cx->on_event(conx, ETCH_CONXEVT_CLOSERR, 3, (void*)(size_t)arc);
- result = -1;
+ // close receiver thread
+ if (conx->rcvlxr != NULL) {
+ etch_join(conx->rcvlxr->thread);
}
cx->socket = NULL;
@@ -680,11 +629,6 @@ int etch_tcpconx_closex(etch_tcp_connect
etch_mutex_unlock(cx->mutex);
}
- // join thread on client receiver thread
- if(conx->rcvlxr != NULL) {
- etch_join(conx->rcvlxr->thread);
- }
-
if (!is_teardown && !is_logged)
cx->on_event(conx, result? ETCH_CONXEVT_CLOSERR: ETCH_CONXEVT_CLOSED, 0, 0);
@@ -697,52 +641,60 @@ int etch_tcpconx_closex(etch_tcp_connect
return result;
}
+/*
+ * etch_udpcconx_close()
+ * close udp connection
+ */
+int etch_udpconx_close(etch_udp_connection* conx)
+{
+ return etch_udpconx_closex(conx, FALSE);
+}
/*
- * destroy_etch_tcp_connection()
- * etch_tcp_connection destructor
+ * destroy_etch_udp_connection()
+ * etch_udp_connection destructor
*/
-int destroy_etch_tcp_connection(void* thisx)
+int destroy_etch_udp_connection(void* thisx)
{
- etch_tcp_connection* tcpx = (etch_tcp_connection*)thisx;
- if (NULL == tcpx) return -1;
- tcpx->cx.on_event(tcpx, ETCH_CONXEVT_DESTROYING, 0, 0);
+ etch_udp_connection* udpx = (etch_udp_connection*)thisx;
+ if (NULL == udpx) return -1;
+ udpx->cx.on_event(udpx, ETCH_CONXEVT_DESTROYING, 0, 0);
- etch_tcpconx_closex (tcpx, FALSE, TRUE); /* close if open */
+ etch_udpconx_closex (udpx, TRUE); /* close if open */
- if (!is_etchobj_static_content(tcpx)) {
+ if (!is_etchobj_static_content(udpx)) {
/* free listener if any */
- etch_object_destroy(tcpx->rcvlxr);
- tcpx->rcvlxr = NULL;
+ etch_object_destroy(udpx->rcvlxr);
+ udpx->rcvlxr = NULL;
- /* free mem owned by tcpx */
- etch_destroy_connection (&tcpx->cx);
+ /* free mem owned by udpx */
+ etch_destroy_connection (&udpx->cx);
/* free session interface */
- if (tcpx->is_session_owned) {
- etch_object_destroy(tcpx->session);
- tcpx->session = NULL;
+ if (udpx->is_session_owned) {
+ etch_object_destroy(udpx->session);
+ udpx->session = NULL;
}
/* free transport interface */
- etch_object_destroy(tcpx->itd);
- tcpx->itd = NULL;
+ etch_object_destroy(udpx->itp);
+ udpx->itp = NULL;
}
- tcpx->cx.on_event(tcpx, ETCH_CONXEVT_DESTROYED, 0, 0);
- return destroy_objectex((etch_object*)tcpx);
+ udpx->cx.on_event(udpx, ETCH_CONXEVT_DESTROYED, 0, 0);
+ return destroy_objectex((etch_object*)udpx);
}
/*
- * new_tcpsocket()
+ * new_udpsocket()
*/
-int new_tcpsocket (apr_socket_t** outsock, apr_pool_t* mempool)
+int new_udpsocket (apr_socket_t** outsock, apr_pool_t* mempool)
{
int rv = 0;
apr_thread_mutex_lock(g_etch_main_pool_mutex);
- rv = apr_socket_create (outsock, APR_INET, SOCK_STREAM, APR_PROTO_TCP, g_etch_main_pool);
+ rv = apr_socket_create (outsock, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, g_etch_main_pool);
apr_thread_mutex_unlock(g_etch_main_pool_mutex);
return rv;
@@ -753,30 +705,30 @@ int new_tcpsocket (apr_socket_t** outsoc
/*
- * etch_tcpclient_receive()
+ * etch_udpclient_receive()
* receive data on socket
* returns length received or -1
*/
-int etch_tcpclient_receive (etch_tcp_connection *tcpx, unsigned char* buf, const size_t buflen, int* rc)
+int etch_udpclient_receive (etch_udp_connection *udpx, unsigned char* buf, const size_t buflen, int* rc)
{
- return etch_tcpclient_receivex (tcpx, buf, buflen, 0, rc);
+ return etch_udpclient_receivex (udpx, buf, buflen, 0, rc);
}
/*
- * etch_tcpclient_receivex()
+ * etch_udpclient_receivex()
* receive data on socket with specified timeout, into specified character buffer.
* @return number of bytes received on success; otherwise -2 (ETCH_OTHER_END_CLOSED)
* if peer closed, or -1 if error.
*/
-int etch_tcpclient_receivex (etch_tcp_connection *tcpx, unsigned char* buf, const size_t buflen, const int timeout_ms, int* rc)
+int etch_udpclient_receivex (etch_udp_connection *udpx, unsigned char* buf, const size_t buflen, const int timeout_ms, int* rc)
{
int result = 0, arc = 0, is_eod = 0, eventid = 0;
int64 existing_timeout_us = 0;
apr_size_t datalen = 0;
- etch_connection *cx = tcpx? &tcpx->cx: NULL;
+ etch_connection *cx = udpx? &udpx->cx: NULL;
if (NULL == cx) return -1;
- cx->on_event(tcpx, ETCH_CONXEVT_RECEIVING, 0, 0);
+ cx->on_event(udpx, ETCH_CONXEVT_RECEIVING, 0, 0);
if (timeout_ms) {
apr_socket_timeout_get(cx->socket, &existing_timeout_us);
@@ -785,7 +737,7 @@ int etch_tcpclient_receivex (etch_tcp_co
datalen = buflen; /* BLOCK on receive data here */
- arc = apr_socket_recv (cx->socket, (char*)buf, &datalen);
+ arc = apr_socket_recvfrom (udpx->remote_addr, cx->socket, 0, (char*)buf, &datalen);
is_eod = arc == APR_EOF;
if (arc && !is_eod) {
@@ -802,22 +754,22 @@ int etch_tcpclient_receivex (etch_tcp_co
eventid = ETCH_CONXEVT_RECEIVERR;
result = -1;
}
- cx->on_event (tcpx, eventid, arc, 0);
+ cx->on_event (udpx, eventid, arc, 0);
}
else
if (0 == datalen)
- { cx->on_event(tcpx, ETCH_CONXEVT_PEERCLOSED, 0, 0);
+ { cx->on_event(udpx, ETCH_CONXEVT_PEERCLOSED, 0, 0);
result = ETCH_OTHER_END_CLOSED;
}
else /* check for signal to shut down server */
if (datalen == ETCH_SHUTDOWNSIGNALSIZE
&& 0 == memcmp(buf, ETCH_SHUTDOWNSIGNAL, ETCH_SHUTDOWNSIGNALSIZE))
- { cx->on_event(tcpx, ETCH_CONXEVT_SHUTDOWN, 0, 0);
+ { cx->on_event(udpx, ETCH_CONXEVT_SHUTDOWN, 0, 0);
result = ETCH_SHUTDOWN_NOTIFIED;
}
else
- { cx->on_event (tcpx, ETCH_CONXEVT_RECEIVED, is_eod, (char*)datalen);
+ { cx->on_event (udpx, ETCH_CONXEVT_RECEIVED, is_eod, (char*)datalen);
if (-1 != result) result = (int) datalen; /* return bytecount */
}
@@ -829,11 +781,11 @@ int etch_tcpclient_receivex (etch_tcp_co
}
-static etch_status_t etch_tcp_client_cleanup(void* p)
+static etch_status_t etch_udp_client_cleanup(void* p)
{
etch_status_t rv = ETCH_SUCCESS;
etch_status_t status = ETCH_SUCCESS;
- etch_tcp_client* client = p;
+ etch_udp_client* client = (etch_udp_client *)p;
status = etch_object_destroy(client->thread);
// TODO: check status
@@ -849,59 +801,51 @@ static etch_status_t etch_tcp_client_cle
/**
- * destroy_etch_tcp_client()
- * tcp client (tcp connection read listener) destructor.
+ * destroy_etch_udp_client()
+ * udp client (udp connection read listener) destructor.
*/
-int destroy_etch_tcp_client(void* data)
+int destroy_etch_udp_client(void* data)
{
- etch_tcp_client* thisx = (etch_tcp_client*)data;
+ etch_udp_client* thisx = (etch_udp_client*)data;
etch_status_t rv = ETCH_SUCCESS;
- rv = etch_tcp_client_cleanup(thisx);
+ rv = etch_udp_client_cleanup(thisx);
return rv;
}
/**
- * etch_tcpclient_listenerproc()
- * tcp socket receive thread procedure.
+ * etch_udpclient_listenerproc()
+ * udp socket receive thread procedure.
*/
-static void etch_tcp_client_receiver_proc(void* data)
+static void etch_udp_client_receiver_proc(void* data)
{
etch_thread_params* params = (etch_thread_params*)data;
int result = 0, arc = 0;
- etch_tcp_connection* tcpx = (etch_tcp_connection*) params->data;
- etch_connection* cx = &tcpx->cx;
+ etch_udp_connection* udpx = (etch_udp_connection*) params->data;
+ etch_connection* cx = &udpx->cx;
const int thread_id = params->etch_thread_id;
const int blen = cx->bufsize? cx->bufsize: ETCH_CONX_DEFAULT_BUFSIZE;
//params->data->threas = params->threadob
etch_flexbuffer* fbuf = new_flexbuffer(blen);
- cx->on_event(tcpx, ETCH_CONXEVT_RCVPUMP_START, 0, 0);
+ cx->on_event(udpx, ETCH_CONXEVT_RCVPUMP_START, 0, 0);
while(cx->is_started)
{
etch_flexbuf_clear(fbuf); /* for debugging otherwise unnecessary */
- cx->on_event(tcpx, ETCH_CONXEVT_RCVPUMP_RECEIVING, thread_id, 0);
+ cx->on_event(udpx, ETCH_CONXEVT_RCVPUMP_RECEIVING, thread_id, 0);
- /* receive data from tcp socket into buffer owned by flexbuffer.
+ /* receive data from udp socket into buffer owned by flexbuffer.
* note that if this receive were to stop blocking, for example
* if the peer went down without it being detected here, we would
* see unfettered looping of this listener procedure. BLOCK.
*/
- result = etch_tcpclient_receive (tcpx, fbuf->buf, blen, &arc);
-
- switch(result)
- {
- case ETCH_THIS_END_CLOSED: case ETCH_OTHER_END_CLOSED:
- /* a socket is down so close connection and exit thread */
- cx->is_started = FALSE; /* this is new: exit thread now */
- result = 0; /* was break here but next line catches it*/
- }
+ result = etch_udpclient_receive (udpx, fbuf->buf, blen, &arc);
if (!cx->is_started) break; /* client shutdown */
- if (result < 0)
- { cx->on_event(tcpx, ETCH_CONXEVT_RCVPUMP_ERR, arc, 0);
- break;
+ if (result < 0) {
+ // print error message, but ignore it and continue
+ cx->on_event(udpx, ETCH_CONXEVT_RCVPUMP_ERR, arc, 0);
}
etch_flexbuffer_reset_to (fbuf, result); /* received (result) bytes */
@@ -910,34 +854,34 @@ static void etch_tcp_client_receiver_pro
cx->on_data (cx, 0, result, fbuf);
}
- tcpx->session->session_notify (tcpx->session->thisx, new_etch_event(0, ETCHEVT_SESSION_DOWN));
+ udpx->session->session_notify (udpx->session->thisx, new_etch_event(0, ETCHEVT_SESSION_DOWN));
- cx->on_event(tcpx, ETCH_CONXEVT_RCVPUMP_STOP, result, (void*) (size_t) thread_id);
+ cx->on_event(udpx, ETCH_CONXEVT_RCVPUMP_STOP, result, (void*) (size_t) thread_id);
etch_object_destroy(fbuf);
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "leaving listener thread ...\n");
}
/**
- * new_tcp_client()
- * tcp client (tcp connection read listener) constructor.
+ * new_udp_client()
+ * udp client (udp connection read listener) constructor.
* this class is an afterthought so it is backwards, the connection hosting
* the client class. maybe we'll change it later to move to client and server
* connection class symmetry. however this works. when a client connection
* needs a read listener it hosts and owns one of these.
- * @param tcpx the tcp connection which is the client's receive listener.
+ * @param udpx the udp connection which is the client's receive listener.
*/
-etch_tcp_client* new_tcp_client (etch_tcp_connection* tcpx)
+etch_udp_client* new_udp_client (etch_udp_connection* udpx)
{
- etch_tcp_client* newclient = NULL;
+ etch_udp_client* newclient = NULL;
- newclient = (etch_tcp_client*)new_object(sizeof(etch_tcp_client), ETCHTYPEB_TCPCLIENT, CLASSID_TCP_CLIENT);
+ newclient = (etch_udp_client*)new_object(sizeof(etch_udp_client), ETCHTYPEB_UDPCLIENT, CLASSID_UDP_CLIENT);
- ((etch_object*)newclient)->destroy = destroy_etch_tcp_client;
- newclient->cxlisten = tcpx; /* client's receive listener is tcpx */
+ ((etch_object*)newclient)->destroy = destroy_etch_udp_client;
+ newclient->cxlisten = udpx; /* client's receive listener is udpx */
- newclient->thread = new_thread(etch_tcp_client_receiver_proc, tcpx);
+ newclient->thread = new_thread(etch_udp_client_receiver_proc, udpx);
if(newclient->thread == NULL) {
- tcpx->cx.on_event (tcpx, ETCH_CONXEVT_STARTERR, 1, 0);
+ udpx->cx.on_event (udpx, ETCH_CONXEVT_STARTERR, 1, 0);
etch_object_destroy(newclient);
newclient = NULL;
}
@@ -952,15 +896,15 @@ etch_tcp_client* new_tcp_client (etch_tc
* on request and destroys them at thread exit. */
//newclient->threadpool = new_threadpool (ETCH_THREADPOOLTYPE_FREE, 1);
- /* data passed to threads will be either this object, or tcp connection
+ /* data passed to threads will be either this object, or udp connection
* objects. here we configure thread mgr to not free these at thread exit */
//newclient->threadpool->is_free_data = FALSE;
//newclient->threadpool->is_data_etchobject = TRUE;
//newclient->is_started = TRUE;
/* start the receive thread on the local thread manager */
- //if (NULL == newclient->threadpool->run(newclient->threadpool, etch_tcp_client_receiver_proc, tcpx)) {
- // tcpx->cx.on_event (tcpx, ETCH_CONXEVT_STARTERR, 1, 0);
+ //if (NULL == newclient->threadpool->run(newclient->threadpool, etch_udp_client_receiver_proc, udpx)) {
+ // udpx->cx.on_event (udpx, ETCH_CONXEVT_STARTERR, 1, 0);
// newclient->destroy(newclient);
// newclient = NULL;
//}
@@ -970,95 +914,50 @@ etch_tcp_client* new_tcp_client (etch_tc
/**
- * etch_tcpclient_start_listener
+ * etch_udpclient_start_listener
* start a receive listener thread on the client connection
*/
-int etch_tcpclient_start_listener (etch_tcp_connection *tcpx)
+int etch_udpclient_start_listener (etch_udp_connection *udpx)
{
- etch_connection *cx = tcpx? &tcpx->cx: NULL;
- //if (NULL == cx || NULL != tcpx->rcvlxr) return -1;
+ etch_connection *cx = udpx? &udpx->cx: NULL;
+ //if (NULL == cx || NULL != udpx->rcvlxr) return -1;
if (NULL == cx)
return -1;
- else if (NULL != tcpx->rcvlxr)
+ else if (NULL != udpx->rcvlxr)
{
- etch_object_destroy(tcpx->rcvlxr);
- tcpx->rcvlxr = NULL;
+ etch_object_destroy(udpx->rcvlxr);
+ udpx->rcvlxr = NULL;
}
- tcpx->rcvlxr = new_tcp_client (tcpx);
+ udpx->rcvlxr = new_udp_client (udpx);
- return NULL == tcpx->rcvlxr? -1: 0;
+ return NULL == udpx->rcvlxr? -1: 0;
}
/**
- * etch_tcpclient_stop_listener
+ * etch_udpclient_stop_listener
* stop the receive listener thread on the client connection
*/
-int etch_tcpclient_stop_listener (etch_tcp_connection *tcpx)
+int etch_udpclient_stop_listener (etch_udp_connection *udpx)
{
int result = 0;
- etch_tcp_client* tcpclient = NULL;
- etch_tcp_connection* clientconx = NULL;
- etch_connection *cx = tcpx? &tcpx->cx: NULL;
- if (NULL == cx || NULL == tcpx->rcvlxr) return -1;
+ etch_udp_client* udpclient = NULL;
+ etch_udp_connection* clientconx = NULL;
+ etch_connection *cx = udpx? &udpx->cx: NULL;
+ if (NULL == cx || NULL == udpx->rcvlxr) return -1;
- tcpclient = tcpx->rcvlxr;
- clientconx = tcpclient->cxlisten;
+ udpclient = udpx->rcvlxr;
+ clientconx = udpclient->cxlisten;
- tcpclient->is_started = FALSE;
+ udpclient->is_started = FALSE;
- result = etch_tcpconx_close (clientconx, FALSE);
+ result = etch_udpconx_close (clientconx);
// aprrc = apr_socket_send (clientconx->cx.socket, ETCH_SHUTDOWNSIGNAL, &datalen);
- // result = etch_tcpclient_send (tcpx, ETCH_SHUTDOWNSIGNAL, ETCH_SHUTDOWNSIGNALSIZE, &aprrc);
+ // result = etch_udpclient_send (udpx, ETCH_SHUTDOWNSIGNAL, ETCH_SHUTDOWNSIGNALSIZE, &aprrc);
return result;
//return aprrc == 0? 0: -1;
}
-
-
-
-
-
-
-/*
- * etch_tcpclient_stop()
- */
-int etch_tcpclient_stop (etch_tcp_connection *conx)
-{
- return etch_tcpconx_close(conx, 0);
-}
-
-
-/* - - - - - - - - - - - - - - - - -
- * tcp client :: i_transportdata
- * - - - - - - - - - - - - - - - - -
- */
-
-
-
-/* - - - - - - - - - - - - - - -
- * tcpclient :: i_transport
- * - - - - - - - - - - - - - - -
- */
-
-/**
- * etch_tcpclient_get_session
- * i_transport::get_session implementation
- */
-i_session* etch_tcpclient_get_session (void* data)
-{
- etch_tcp_connection* thisx = (etch_tcp_connection*)data;
- ETCH_ASSERT(is_etch_tcpconnection(thisx));
- return (i_session*)thisx->session;
-}
-
-
-
-
-
-
-
-