You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by jd...@apache.org on 2009/04/22 19:25:51 UTC
svn commit: r767594 [21/43] - in /incubator/etch/trunk/binding-c/runtime/c:
./ ext/ ext/hashtab/ ext/lib/ inc/ lib/ project/ project/$etchstop/
project/bin/ project/etch/ project/logcli/ project/logsrv/ project/notes/
project/test/ project/test/logcli/...
Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,876 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_stub.c
+ * contains the base runtime code for either type of stub.
+ *
+ * the generated stub_xxxx_server will define the thread procedures for each message
+ * type in the service. for each such type, it will set the virtual stub helpers in
+ * the type objects, to point to these thread functions. each such thread procedure
+ * will call the user-coded calculation in the impl_xxxx_server for that message
+ * type; for example, for an "add" message, it will get the fields from the service
+ * value factory which are the arguments to the calculation, call the calculation in
+ * the server impl, insert the result of the calculation into the reply message, and
+ * call the delivery service transport_message to send the reply back to the sender.
+ */
+
+#include "apr_network_io.h"
+#include "apr_general.h"
+
+#include "etch_svcobj_masks.h"
+#include "etch_stub.h"
+#include "etch_global.h"
+#include "etchexcp.h"
+#include "etchlog.h"
+
+int etchstub_session_notify_ex (void*, void*, etch_event*);
+int etchstub_session_message(etch_stub*, etch_who*, etch_message*);
+int etchstub_session_control(etch_stub*, etch_event*, objmask*);
+int etchstub_session_notify (etch_stub*, etch_event*);
+objmask* etchstub_session_query (etch_stub*, objmask*);
+i_objsession* etchstub_get_session_callbacks_from (void* _obj);
+
+char* ETCHSTUB = "STUB";
+char* scstr = "session control", *snstr = "session notify", *sqstr = "session query";
+char* logmask1 = "%s not routed to client\n", *logmask2 = "%s not registered\n";
+
+
+
+#if(0)
+____________________________________________________________________________________
+the inheritance hierarchy becomes indefinite at this point. a stub should be able
+to query a service object for a particular interface, down through its inheritance
+chain. for example, we need to find who has implemented obj_session and call methods
+on it. a use case is a custom transport implemented as a dynamically loaded dll.
+the stub needs therefore to be able to call into objects it has never seen before.
+
+for the present we will make some assumptions as to the inheritance hierarchy, that
+being that given a service interface, we know its inheritance hierarchy. following
+is a diagram illustrating the inheritance of a service interface named XXXX.
+all arrows direct down unless indicated by an arrowhead. (e) indicates extends,
+(i) indicates implements. this diagram is upside-down to the way we normally see
+the hierarchy.
+ (obj)
+ |
+ XXXX <- - - - - - - - - - (stub)
+ | |
+ (e) --------- ---------- (e)
+ | |
+ XXXX_server remote_XXXX
+ | \ (i) | (e)
+ (i) | ------> remote_XXXX_server
+ obj_session <-- base_XXXX_server
+ (e) |
+ impl_XXXX_server
+____________________________________________________________________________________
+#endif
+
+
+#if(0)
+
+IMPL_XXXX_SERVER
+ - ctor(REMOTE_XXXX_CLIENT)
+ - usermethod_1_impl(); ...
+ - usermethod_n_impl();
+
+ BASE_XXXX_SERVER
+ - usermethod_1_stub(); ...
+ - usermethod_n_stub();
+ OBJSESSION
+ - _session_control; _session_notify; _session_query; (stubs)
+ XXXX_SERVER
+ - any server-directed methods go here
+ XXXX
+ - see user method impls above
+
+
+STUBXXXX<T extends XXXX> extends STUBBASE<T>
+ - ctor(DeliveryService, pool, pool);
+ - stubhelper();
+
+ STUBBASE<T> (DeliveryService, pool, pool)
+ T obj;
+ DeliveryService, Pool, Pool;
+ stub_helper();
+
+ STUBPOOLRUNNABLE
+ - set(stub_helper, who, msg); (set state, not in java)
+ - stubhelper(stub, delsvc, obj, who, msg);
+
+ SESSIONMESSAGE
+ - session_message(who, message);
+ { stubhelper = message.type.get_helper()
+ stubhelper(delsvc, obj, who, msg); // the stubhelper impl calls threadpool run()
+ }
+ - session_control; session_notify; session_query;
+
+
+STUBXXXSERVER (generated, hand-codable)
+ - ctor(DeliveryService, XXXXSERVER, pool, pool);
+ - for each type in the vf,
+ -- implement a run method (threadproc) for the message type
+ -- implement a stub helper method which calls the threadpool run using the threadproc above
+
+ STUBXXXX
+
+#endif
+
+
+/* - - - - - - - - - - - - - - - - - -
+ * stub base method implementations
+ * - - - - - - - - - - - - - - - - - -
+ */
+
+/**
+ * destroy_stub()
+ * etch_stub destructor. destroy stub base object.
+ * the stub base does not destroy its xxxx_either_stub wrapper.
+ */
+int destroy_stub(etch_stub* thisx)
+{
+ if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;
+
+ if (!is_etchobj_static_content(thisx))
+ {
+ /* note that a delivery service session is the stub's session.
+ * the delivery service owns the stub, and the stub owns this shared
+ * session. the stub wrapper is the session interface's thisx.
+ * the delivery service gets xxxx_either_stub* stubbobj from its
+ * isessionmsg->thisx, and etch_stub* stubbase from stubobj.stub_base.
+ * destroying the stub will destroy the shared i_sessionmessage, so
+ * the delivery service's session will become invalid once its stub
+ * is destroyed. see transport.destroy_delivery_service_stub().
+ */
+ if (thisx->isessionmsg)
+ thisx->isessionmsg->destroy(thisx->isessionmsg);
+
+ if (thisx->obj && thisx->is_implobj_owned)
+ thisx->obj->destroy(thisx->obj);
+ }
+
+ return destroy_objectex((objmask*) thisx);
+}
+
+
+/**
+ * new_stub()
+ * etch_stub (stub base) constructor.
+ */
+etch_stub* new_stub (void* implobj, unsigned char stubtype,
+ i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp)
+{
+ i_sessionmessage* ism = NULL;
+
+ etch_stub* stubbase = (etch_stub*) new_object
+ (sizeof(etch_stub), ETCHTYPEB_STUB, CLASSID_STUB);
+
+ ETCH_ASSERT(implobj && stubtype && qp && fp);
+ ETCH_ASSERT(is_etch_ideliverysvc(ids));
+
+ stubbase->stub_type = stubtype;
+ stubbase->destroy = destroy_stub;
+
+ stubbase->obj = implobj; /* server or client impl e.g. i_xxxx_server* */
+ stubbase->delivery_service = ids;
+
+ /* instantiate i_sessionmessage session interface. note that the interface's
+ * thisx pointer is this base stub object, and that the base stub's container
+ * xxxx_either_stub* is the base stub.stubobj.
+ */
+ ism = new_sessionmsg_interface (stubbase, etchstub_session_message, NULL);
+ stubbase->isessionmsg = ism;
+ stubbase->session_message = ism->session_message = etchstub_session_message;
+ stubbase->session_control = ism->session_control = etchstub_session_control;
+ stubbase->session_notify = ism->session_notify = etchstub_session_notify;
+ stubbase->session_query = ism->session_query = etchstub_session_query;
+
+ /* set delivery service session interface to be the stub's i_sessionmessage.
+ * in a c binding translation from java binding, a reference to a stub object
+ * is only via via the delivery service's session_message interface, shared
+ * between the stub and the delivery service.
+ * the delivery service must have provided i_transportmessage implementations,
+ * in particular for set_session(). the instantiator of the delivery service
+ * must therefore override the delivery service's i_transportmessage.
+
+ /* fyi: ids->itm->thisx is mailbox manager.
+ * ids->itm is mboxmgr's transport interface, which is the delivery service.
+ * set session of next lower level (delivery service) to the stub's session.
+ */
+ ids->itm->set_session (ids, stubbase->isessionmsg);
+
+ /* copy impl's i_objsession to the stub for convenience */
+ stubbase->impl_callbacks = etchstub_get_session_callbacks_from (implobj);
+
+ return stubbase;
+}
+
+
+/**
+ * etchstub_get_session_callbacks_from()
+ * extract objsession interface from specified stub implementor object.
+ */
+i_objsession* etchstub_get_session_callbacks_from (void* _obj)
+{
+ i_objsession* iobjsession = NULL;
+ objmask* obj = (objmask*) _obj;
+ const int this_objtype = obj? obj->obj_type: 0;
+
+ switch(this_objtype)
+ {
+ case ETCHTYPEB_EXESERVERIMPL:
+ { xxxx_server_impl* server_impl = (xxxx_server_impl*) obj;
+ iobjsession = server_impl->iobjsession;
+ break;
+ }
+
+ case ETCHTYPEB_EXECLIENTIMPL:
+ { xxxx_client_impl* client_impl = (xxxx_client_impl*) obj;
+ iobjsession = client_impl->iobjsession;
+ break;
+ }
+
+ case ETCHTYPEB_EXESERVERBASE:
+ { i_xxxx_server* iserver = (i_xxxx_server*) obj;
+ xxxx_server_impl* server_impl = (xxxx_server_impl*) iserver->thisx;
+ ETCH_ASSERT(server_impl && server_impl->obj_type == ETCHTYPEB_EXESERVERIMPL);
+ iobjsession = server_impl->iobjsession;
+ break;
+ }
+
+ case ETCHTYPEB_EXECLIENTBASE:
+ { i_xxxx_client* iclient = (i_xxxx_client*) obj;
+ xxxx_client_impl* client_impl = (xxxx_client_impl*) iclient->thisx;
+ ETCH_ASSERT(client_impl && client_impl->obj_type == ETCHTYPEB_EXECLIENTIMPL);
+ iobjsession = client_impl->iobjsession;
+ break;
+ }
+ }
+
+ return iobjsession;
+}
+
+
+/**
+ * etchstub_put_exception
+ * put serializable exception to reply message.
+ * @param replymsg the message, caller retains.
+ * @param fromobj the etch object or etch_exception wrapping the etchexception,
+ * caller retains.
+ */
+int etchstub_put_exception (etch_stub* stub, etch_message* replymsg, objmask* fromobj)
+{
+ int result = 0;
+ etchlog(ETCHSTUB, ETCHLOG_INFO, "throwing remote exception\n");
+
+ /* insert serializable exception to reply message */
+ result = message_throw_from (replymsg, fromobj);
+
+ /* notify the session with a copy of the exception, relinquished */
+ stub->session_notify (stub, new_etch_exception_from (fromobj));
+ return result;
+}
+
+
+/**
+ * etchstub_put_resultobj
+ * insert specified result object to message.
+ * @param replymsg the message, caller retains.
+ * @param resultobj the result object, caller relinquishes regardless of result.
+ * @return 0 success, -1 failure.
+ */
+int etchstub_put_resultobj (etch_stub* stub, etch_message* replymsg, objmask* resultobj)
+{
+ int result = 0;
+ etch_field* key_result = builtins._mf_result;
+
+ /* check if the message already has a result object, i.e., an exception */
+ if (message_get (replymsg, key_result))
+ resultobj->destroy(resultobj); /* if so, discard the passed result */
+ else /* resultobj is relinquished here regardless of result */
+ result = message_put (replymsg, clone_field(key_result), (objmask*) resultobj);
+
+ return result;
+}
+
+
+/**
+ * etchstub_send_reply
+ * private method invoked by all "stub helpers" (message logic implementation
+ * runners) for messages requiring a reply, in order to instantiate a reply
+ * message and transmit it back to sender.
+ */
+int etchstub_send_reply (etch_stub* stub, i_delivery_service* dsvc,
+ etch_who* whofrom, etch_message* msg, objmask* resultobj)
+{
+ int result = 0;
+ const int is_exception_resobj = is_exception(resultobj);
+ const int is_exception_message = is_exception(msg);
+ const int is_exception_result = is_exception_resobj || is_exception_message;
+ etch_type* newtype = NULL; /* see comment following */
+
+ /* instantiate the reply message
+ * note that if we are called for a one-way message, it is only if an
+ * exception was thrown by the implementation, and the exception is
+ * therefore to be the reply. in such a case, the etch compiler generates
+ * a message type as newtype above, which is the second parameter to
+ * message_reply(), in order that in_reply_to can be instantiated.
+ * (our example below makes newtype the 1-way exception reply type in
+ * this case). for two-way messages, newtype, and thus the second parameter
+ * to message_reply() is null.
+ */
+ etch_message* replymsg = message_reply (msg, newtype);
+
+ if (NULL == replymsg && is_exception_result)
+ { newtype = builtins._mt__exception;
+ replymsg = message_reply (msg, newtype);
+ }
+
+ if (NULL == replymsg)
+ { etch_type* replytype = newtype? newtype: message_type(msg);
+ char* logmask = "could not create reply message for type %s\n";
+ etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, replytype->aname);
+ return -1;
+ }
+
+ if (is_exception_resobj) /* the result wraps, or is, an exception */
+ etchstub_put_exception (stub, replymsg, resultobj);
+ else
+ if (is_exception_message) /* the message wraps an exception */
+ etchstub_put_exception (stub, replymsg, (objmask*) msg);
+
+ /* insert result object to message. resultobj is relinquished here.
+ * note that if message_put() or transport_message() were to fail,
+ * the other side would not receive a result, and would be aware
+ * only via timeout or via the peer connection dropping.
+ */
+ result = etchstub_put_resultobj (stub, replymsg, resultobj);
+
+ if (0 == result) /* forward reply message for serialization */
+ result = dsvc->itm->transport_message (dsvc, whofrom, replymsg);
+
+ return result;
+}
+
+
+/**
+ * etchstub_validate_args
+ * private method invoked by all "stub helpers" to validate parameters
+ * to the individual stub helper and populate some objects for the helper.
+ */
+int etchstub_validate_args (etch_stub* stub, i_delivery_service* dsvc,
+ etch_message* msg, void* client, default_value_factory** vf,
+ void** vfimpl, void** climpl)
+{
+ int result = 0;
+ i_xxxx_client* iclient = NULL;
+ objmask* clientimpl = NULL, *valufactimpl = NULL;
+ /* any assertion failure in this method indicates a coding error in etch core */
+ ETCH_ASSERT(stub && dsvc && client && msg && stub->params && vf && vfimpl && climpl);
+
+ iclient = (i_xxxx_client*) client;
+ clientimpl = iclient->thisx;
+ ETCH_ASSERT(is_etch_serverimpl(clientimpl));
+ *climpl = clientimpl;
+
+ *vf = (default_value_factory*) stub->params->in_valufact;
+ ETCH_ASSERT(is_etch_valuefact(*vf));
+ valufactimpl = (*vf)->impl;
+ ETCH_ASSERT(is_etch_valuefactimpl(valufactimpl));
+ *vfimpl = valufactimpl;
+
+ return result;
+}
+
+
+/**
+ * etchstub_session_notify_ex()
+ *
+ * @param thisx the object which will receive an exception if any.
+ * may be the same object as _obj. caller retains.
+ *
+ * @param _obj caller retains. usually a xxxx_server_impl. may be null, or
+ * may be the same object as thisx. obj will belong to a known and limited set
+ * of classes, e.g. ETCHTYPEB_EXESERVERIMPL, so we can test _obj.obj_type
+ * if needed, and cast _obj to one of the mask objects (etch_svcobj_masks.h)
+ * such as xxxx_server_impl, in order to reference the maskable content.
+ *
+ * @param evt a notification event or a throwable exception, caller relinquishes.
+ *
+ * @return 0 success, -1 failure.
+ */
+int etchstub_session_notify_ex (void* thisx, void* _obj, etch_event* evt)
+{
+ int result = -1, is_evtparm_relinquished = FALSE;
+
+ /* this call arrives from type "stub helpers" implemented in xxxx_server_stub.
+ * these are thread procedures with arguments delivery service, some object
+ * _obj (possibly implementing objsession or throwable), who, and message,
+ * viz: int (*stubhelper) (stub, deliverysvc, _obj, sender, message);
+ * so from argument _obj, we extract the objsession interface and if present,
+ * call its _session_notify.
+ */
+
+ /* a server implementation (xxxx_server_impl) can request notifications of
+ * exceptions and the like by implementing and registering i_objsession
+ * callbacks. the presence of these function pointers in the implementation
+ * serves as the indicator of whether to forward the notification.
+ */
+ i_objsession* impl_callbacks = etchstub_get_session_callbacks_from (_obj);
+
+ etch_session_notify impl_session_notify = impl_callbacks?
+ impl_callbacks->_session_notify: NULL;
+
+ if (impl_session_notify) /* if impl has requested this event, forward it */
+ { /* event is relinquished to notify handlers by contract */
+ result = impl_session_notify (impl_callbacks, evt);
+ is_evtparm_relinquished = TRUE;
+ }
+
+ if (0 != result)
+ { char* logmask = impl_session_notify? logmask1: logmask2;
+ etchlog(ETCHSTUB, ETCHLOG_DEBUG, logmask, snstr);
+ }
+
+ if (!is_evtparm_relinquished)
+ ETCHOBJ_DESTROY(evt);
+
+ return result;
+}
+
+
+/* - - - - - - - - - - - - - - - - - - - - -
+ * stub helper threadpool execution support
+ * - - - - - - - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etchstub_loghelper()
+ * convenience method to log entry or exit of a stub helper function.
+ */
+void etchstub_loghelper (opaque_stubhelper f,
+ const int result, const int thread_id, const int is_start)
+{
+ if (config.loglevel <= ETCHLOG_DEBUG)
+ { char onthread[32];
+ if (thread_id)
+ sprintf(onthread, "on thread %d", thread_id);
+ else *onthread = '\0';
+
+ if (is_start)
+ etchlog(ETCHSTUB, ETCHLOG_DEBUG, "start stub runner %x %s\n",
+ f, onthread);
+ else etchlog(ETCHSTUB, ETCHLOG_DEBUG, "exit stub runner %x (%d) %s\n",
+ f, result, onthread);
+ }
+}
+
+
+/**
+ * etchstub_proxy_threadproc()
+ * a thread procedure conforming to the signature expected by etch_threadpool,
+ * which accepts a stub parameter bundle as params.data, unbundles the parameters,
+ * and invokes the included stub helper function with those parameters.
+ */
+void etchstub_proxy_threadproc (etch_threadparams* tp)
+{
+ int result = 0;
+ etchstub_runparams* p = tp? tp->data: NULL;
+ ETCH_ASSERT(p && p->run && (p->sig == ETCH_STUBPARAMS_SIGNATURE));
+ etchstub_loghelper (p->run, 0, tp->etch_thread_id, 1);
+
+ /* run "stub helper" procedure */
+ result = p->run (p->stub, p->ds, p->server, p->who, p->msg);
+
+ etchstub_loghelper (p->run, result, tp->etch_thread_id, 0);
+ /* etch_threadparams is marked as owning data so we don't destroy p here */
+}
+
+
+/**
+ * new_etch_stubparams()
+ * bundle up stub runner parameters in order they can become
+ * etch_threadparams.data, passed in to the etchstub_proxy_threadproc.
+ */
+etchstub_runparams* new_etch_stubparams (etch_stub* stub, opaque_stubhelper runproc,
+ i_delivery_service* ds, i_xxxx_server* server, etch_who* whofrom, etch_message* msg)
+{
+ etchstub_runparams* p = etch_malloc(sizeof(etchstub_runparams), 0);
+ p->sig = ETCH_STUBPARAMS_SIGNATURE;
+ p->stub = stub;
+ p->run = runproc;
+ p->ds = ds;
+ p->who = whofrom;
+ p->msg = msg;
+ p->server = server;
+ return p;
+}
+
+
+/**
+ * etchstub_run()
+ * execute the service method helper function (stub helper). if no threadpool
+ * is specified, execute the helper function inline (on caller's thread);
+ * otherwise wrap the stub helper arguments and launch a thread on the supplied
+ * threadpool to execute the helper function.
+ * @param stub caller retains.
+ * @param runproc the service function for this message type.
+ * @param threadpool caller retains.
+ * @param server caller retains.
+ * @param who caller retains.
+ * @param msg caller retains.
+ * @return 0 success, -1 failure.
+ */
+int etchstub_run (etch_stub* stub, opaque_stubhelper runproc, etch_threadpool* threadpool,
+ i_xxxx_server* server, etch_who* who, etch_message* msg)
+{
+ int result = 0;
+ i_delivery_service* ds = stub->delivery_service;
+ ETCH_ASSERT(runproc && ds && server && msg);
+
+ if (threadpool)
+ {
+ /* a pointer to this parameter bundle will become etch_threadparams.data.
+ * etch_threadparams.is_own_data determines if the thread frees data on exit.
+ * is_own_data assumes the value of threadpool.is_free_data, which is true
+ * by default. so unless we have reset is_free_data on this pool, the pool
+ * thread will free this allocation at thread exit. and since also by default
+ * pool->is_data_etchobject is false, the deallocation will use etch_free;
+ */
+ etchstub_runparams* p = new_etch_stubparams(stub, runproc, ds, server, who, msg);
+
+ etch_thread* poolthread /* run stub helper function on a pool thread */
+ = threadpool->run (threadpool, etchstub_proxy_threadproc, p);
+
+ if (poolthread) /* switch context and block on pool thread exit */
+ { const int id = poolthread->params.etch_thread_id;
+ etchlog(ETCHSTUB, ETCHLOG_XDEBUG, "stub runner joining pool thread %d ...\n", id);
+ etch_join (poolthread);
+ }
+ else result = -1;
+ }
+ else
+ { etchstub_loghelper(runproc, 0, 0, 1);
+
+ /* run stub helper function on this thread */
+ result = runproc (stub, ds, server, who, msg);
+
+ etchstub_loghelper(runproc, result, 0, 0);
+ }
+
+ return result;
+}
+
+
+/* - - - - - - - - - -
+ * i_sessionmessage
+ * - - - - - - - - - -
+ */
+
+/**
+ * etchstub_session_message()
+ * @param whofrom caller retains, can be null.
+ * @param msg caller relinquishes
+ * @return 0 (message handled), or -1 (error, closed, or timeout)
+ */
+int etchstub_session_message (etch_stub* thisx, etch_who* whofrom, etch_message* msg)
+{
+ xxxx_either_stub* stubimpl = (xxxx_either_stub*) thisx->stubobj;
+ i_xxxx_server* serverimpl = NULL;
+ etch_session* sessionparams = NULL;
+ opaque_stubhelper stubhelper = NULL;
+ etch_threadpool* threadpool = NULL;
+ int result = -1, session_id = 0;
+ unsigned char async_mode = 0;
+
+ /* get the message type object associated with this message */
+ etch_type* thistype = message_type(msg);
+ ETCH_ASSERT(thistype);
+ ETCH_ASSERT(is_etch_stub(stubimpl));
+ session_id = stubimpl->owner_id;
+
+ do
+ { get_etch_session (thisx->params, session_id, &sessionparams);
+ ETCH_ASSERT (sessionparams);
+ serverimpl = sessionparams->server;
+
+ /* get the message type's stub helper (runner) function.
+ * the stub helper is a pointer to a thread procedure function specific
+ * to the message type, which executes the associated service method.
+ */
+ if (NULL == (stubhelper = etchtype_get_type_stubhelper (thistype)))
+ { char* msgmask = "type '%s' missing stub runner procedure\n";
+ etchlog(ETCHSTUB, ETCHLOG_ERROR, msgmask, thistype->aname);
+ break;
+ }
+
+ /* get the thread pool, if any, appropriate for the
+ * configured mode of execution for this message type
+ */
+ switch(async_mode = etchtype_get_async_mode (thistype))
+ { case ETCH_ASYNCMODE_QUEUED: threadpool = thisx->params->qpool; break;
+ case ETCH_ASYNCMODE_FREE: threadpool = thisx->params->fpool; break;
+ default: threadpool = NULL;
+ }
+
+ /* execute the service method execution function (stub helper),
+ * either on a thread or inline.
+ */
+ result = etchstub_run (thisx, stubhelper, threadpool, serverimpl, whofrom, msg);
+
+ } while(0);
+
+ /* this is the end of the line for the message */
+ msg->destroy(msg);
+
+ return result;
+}
+
+
+/**
+ * etchstub_session_control()
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etchstub_session_control (etch_stub* thisx, etch_event* control, objmask* value)
+{
+ int result = -1, is_params_relinquished = FALSE;
+
+ /* a server implementation (xxxx_server_impl) can request notifications of
+ * exceptions and the like by implementing and registering i_objsession
+ * callbacks. the presence of these function pointers in the implementation
+ * serves as the indicator of whether to forward the notification.
+ */
+ i_objsession* impl_callbacks = thisx->impl_callbacks;
+
+ etch_session_control impl_session_control = impl_callbacks?
+ impl_callbacks->_session_control: NULL;
+
+ if (impl_session_control) /* if impl has requested this event, forward it */
+ if (0 == (result = impl_session_control (impl_callbacks, control, value)))
+ is_params_relinquished = TRUE;
+
+ if (0 != result)
+ { char* logmask = impl_session_control? logmask1: logmask2;
+ etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, scstr);
+ }
+
+ if (!is_params_relinquished)
+ { ETCHOBJ_DESTROY(control);
+ ETCHOBJ_DESTROY(value);
+ }
+
+ return result;
+}
+
+
+/**
+ * etch_etchstub_session_notify()
+ * @param evt event, caller relinquishes.
+ */
+int etchstub_session_notify (etch_stub* thisx, etch_event* evt)
+{
+ const int result = etchstub_session_notify_ex (thisx, thisx->obj, evt);
+ return result;
+}
+
+
+/**
+ * etch_etchstub_session_query()
+ * @param query, caller relinquishes.
+ */
+objmask* etchstub_session_query (etch_stub* thisx, objmask* query)
+{
+ objmask* resultobj = NULL;
+ int is_params_relinquished = FALSE;
+
+ /* a server implementation (xxxx_server_impl) can request notifications of
+ * exceptions and the like by implementing and registering i_objsession
+ * callbacks. the presence of these function pointers in the implementation
+ * serves as the indicator of whether to forward the notification.
+ */
+ i_objsession* impl_callbacks = thisx->impl_callbacks;
+
+ etch_session_query impl_session_query = impl_callbacks?
+ impl_callbacks->_session_query: NULL;
+
+ if (impl_session_query) /* if impl has requested this event, forward it */
+ if (NULL != (resultobj = impl_session_query (impl_callbacks, query)))
+ is_params_relinquished = TRUE;
+
+ if (NULL == resultobj)
+ { char* logmask = impl_session_query? logmask1: logmask2;
+ etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, sqstr);
+ }
+
+ if (!is_params_relinquished)
+ ETCHOBJ_DESTROY(query);
+
+ return resultobj;
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * constructors, destructor
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * destroy_stub_object()
+ * stub implementation private destructor.
+ * calls back to hand-coded destructor for possible custom deallocation,
+ * then destroys the stub base, including the shared session interface.
+ */
+int destroy_stub_object (void* thisx)
+{
+ xxxx_either_stub* stubobj = NULL;
+ if (NULL == thisx) return -1;
+
+ stubobj = (xxxx_either_stub*) thisx;
+ if (!is_etch_stub(thisx)) return -1;
+
+ if (stubobj->refcount > 0 && --stubobj->refcount > 0) return -1;
+
+ if (!is_etchobj_static_content(stubobj))
+ {
+ if (stubobj->destroyex) /* call back to user dtor */
+ stubobj->destroyex(thisx);
+
+ if (stubobj->stub_base)
+ stubobj->stub_base->destroy(stubobj->stub_base);
+ }
+
+ return destroy_objectex((objmask*)stubobj);
+}
+
+
+/**
+ * is_etch_stub()
+ */
+int is_etch_stub(void* x)
+{
+ int result = FALSE, objtype = x? ((objmask*)x)->obj_type: 0;
+ switch(objtype)
+ { case ETCHTYPEB_STUB: case ETCHTYPEB_CLIENTSTUB: case ETCHTYPEB_SERVERSTUB:
+ result = TRUE;
+ }
+ return result;
+}
+
+
+/**
+ * new_stubimpl_init()
+ * generic stub implementation constructor
+ * @param implobj i_xxxx_client* or i_xxxx_server*.
+ * @param objsize number of bytes in the stub object implementation.
+ * all we know about here is the mask, or object header if you will.
+ * @param stubtype ETCH_STUBTYPE_CLIENT or ETCH_STUBTYPE_SERVER.
+ * @param userdtor a hand-coded callback in the implementation conforming
+ * to signature etch_destructor, for the purpose of freeing any custom
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains.
+ * @param fp the free thread pool, optional, caller retains.
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_stubimpl_init (void* implobj, const int objsize,
+ const unsigned char stubtype, etch_destructor userdtor,
+ i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp,
+ void* params)
+{
+ unsigned short obj_type, class_id;
+ xxxx_either_stub* newstub = NULL;
+
+ switch(stubtype)
+ {
+ case ETCH_STUBTYPE_CLIENT:
+ obj_type = ETCHTYPEB_CLIENTSTUB;
+ class_id = CLASSID_CLIENTSTUB;
+ break;
+ case ETCH_STUBTYPE_SERVER:
+ obj_type = ETCHTYPEB_SERVERSTUB;
+ class_id = CLASSID_SERVERSTUB;
+ break;
+ default: return NULL;
+ }
+
+ newstub = (xxxx_either_stub*) new_object (objsize, obj_type, class_id);
+ newstub->destroy = destroy_stub_object; /* private */
+ newstub->destroyex = userdtor; /* public */
+ newstub->stub_base = new_stub (implobj, stubtype, ids, qp, fp);
+ newstub->stub_base->stubobj = (objmask*) newstub;
+ newstub->stub_base->params = params;
+
+ return newstub;
+}
+
+
+/**
+ * new_clientstub_init()
+ * generic client stub implementation constructor.
+ * @param bytelen number of bytes in the client stub object implementation.
+ * all we know about here is the mask, or object header if you will.
+ * @param dtor a hand-coded callback in the implementation conforming
+ * to signature etch_destructor, for the purpose of freeing any custom
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains.
+ * @param fp the free thread pool, optional, caller retains.
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_clientstub_init (void* implobj, const int bytelen, etch_destructor dtor,
+ i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp, void* params)
+{
+ return new_stubimpl_init (implobj, bytelen, ETCH_STUBTYPE_CLIENT,
+ dtor, ids, qp, fp, params);
+}
+
+
+/**
+ * new_serverstub_init()
+ * generic server stub implementation constructor.
+ * @param bytelen number of bytes in the server stub object implementation.
+ * all we know about here is the mask, or object header if you will.
+ * @param dtor a hand-coded callback in the implementation conforming
+ * to signature etch_destructor, for the purpose of freeing any custom
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains.
+ * @param fp the free thread pool, optional, caller retains.
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_serverstub_init (void* implobj, const int bytelen, etch_destructor dtor,
+ i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp, void* params)
+{
+ return new_stubimpl_init (implobj, bytelen, ETCH_STUBTYPE_SERVER,
+ dtor, ids, qp, fp, params);
+}
+
+
+/* - - - - - - - - - - - - - - - - - - - - -
+ * generic methods on a stub implementation
+ * - - - - - - - - - - - - - - - - - - - - -
+ */
+
+
Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,1028 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/*
+ * etch_threadpool_apr.c
+ * apache portable runtime threadpool code
+ */
+
+#include <assert.h>
+#include "etch_threadpool_apr.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_portable.h"
+
+#define TASK_PRIORITY_SEGS 4
+#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
+
+
+typedef struct apr_thread_pool_task
+{
+ APR_RING_ENTRY(apr_thread_pool_task) link;
+ apr_thread_start_t func;
+ void *param;
+ void *owner;
+ union
+ {
+ apr_byte_t priority;
+ apr_time_t time;
+ } dispatch;
+} apr_thread_pool_task_t;
+
+
+APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
+
+
+struct apr_thread_list_elt
+{
+ APR_RING_ENTRY(apr_thread_list_elt) link;
+ apr_thread_t *thd;
+ volatile void *current_owner;
+ volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
+};
+
+
+APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
+
+
+struct apr_thread_pool
+{
+ apr_pool_t *pool;
+ volatile apr_size_t thd_max;
+ volatile apr_size_t idle_max;
+ volatile apr_interval_time_t idle_wait;
+ volatile apr_size_t thd_cnt;
+ volatile apr_size_t idle_cnt;
+ volatile apr_size_t task_cnt;
+ volatile apr_size_t scheduled_task_cnt;
+ volatile apr_size_t threshold;
+ volatile apr_size_t tasks_run;
+ volatile apr_size_t tasks_high;
+ volatile apr_size_t thd_high;
+ volatile apr_size_t thd_timed_out;
+ struct apr_thread_pool_tasks *tasks;
+ struct apr_thread_pool_tasks *scheduled_tasks;
+ struct apr_thread_list *busy_thds;
+ struct apr_thread_list *idle_thds;
+ apr_thread_mutex_t *lock;
+ apr_thread_mutex_t *cond_lock;
+ apr_thread_cond_t *cond;
+ volatile int terminated;
+ struct apr_thread_pool_tasks *recycled_tasks;
+ struct apr_thread_list *recycled_thds;
+ apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
+};
+
+
+
+static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
+ apr_size_t init_threads,
+ apr_size_t max_threads)
+{
+ apr_status_t rv;
+ int i;
+
+ me->thd_max = max_threads;
+ me->idle_max = init_threads;
+ me->threshold = init_threads / 2;
+ rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
+ me->pool);
+ if (APR_SUCCESS != rv) {
+ return rv;
+ }
+ rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
+ me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->cond, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_mutex_destroy(me->lock);
+ apr_thread_mutex_destroy(me->cond_lock);
+ return rv;
+ }
+ me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+ if (!me->tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
+ me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
+ if (!me->scheduled_tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
+ me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+ if (!me->recycled_tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
+ me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+ if (!me->busy_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
+ me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
+ if (!me->idle_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
+ me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
+ if (!me->recycled_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
+ me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
+ me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
+ me->idle_wait = 0;
+ me->terminated = 0;
+ for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
+ me->task_idx[i] = NULL;
+ }
+ goto FINAL_EXIT;
+ CATCH_ENOMEM:
+ rv = APR_ENOMEM;
+ apr_thread_mutex_destroy(me->lock);
+ apr_thread_mutex_destroy(me->cond_lock);
+ apr_thread_cond_destroy(me->cond);
+ FINAL_EXIT:
+ return rv;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
+{
+ apr_thread_pool_task_t *task = NULL;
+ int seg;
+
+ /* check for scheduled tasks */
+ if (me->scheduled_task_cnt > 0) {
+ task = APR_RING_FIRST(me->scheduled_tasks);
+ assert(task != NULL);
+ assert(task !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link));
+ /* if it's time */
+ if (task->dispatch.time <= apr_time_now()) {
+ --me->scheduled_task_cnt;
+ APR_RING_REMOVE(task, link);
+ return task;
+ }
+ }
+ /* check for normal tasks if we're not returning a scheduled task */
+ if (me->task_cnt == 0) {
+ return NULL;
+ }
+
+ task = APR_RING_FIRST(me->tasks);
+ assert(task != NULL);
+ assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
+ --me->task_cnt;
+ seg = TASK_PRIORITY_SEG(task);
+ if (task == me->task_idx[seg]) {
+ me->task_idx[seg] = APR_RING_NEXT(task, link);
+ if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+ apr_thread_pool_task, link)
+ || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+ me->task_idx[seg] = NULL;
+ }
+ }
+ APR_RING_REMOVE(task, link);
+ return task;
+}
+
+
+static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
+{
+ apr_thread_pool_task_t *task = NULL;
+
+ task = APR_RING_FIRST(me->scheduled_tasks);
+ assert(task != NULL);
+ assert(task !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link));
+ return task->dispatch.time - apr_time_now();
+}
+
+
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
+ apr_thread_t * t)
+{
+ struct apr_thread_list_elt *elt;
+
+ if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
+ elt = apr_pcalloc(me->pool, sizeof(*elt));
+ if (NULL == elt) {
+ return NULL;
+ }
+ }
+ else {
+ elt = APR_RING_FIRST(me->recycled_thds);
+ APR_RING_REMOVE(elt, link);
+ }
+
+ APR_RING_ELEM_INIT(elt, link);
+ elt->thd = t;
+ elt->current_owner = NULL;
+ elt->state = TH_RUN;
+ return elt;
+}
+
+
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the idle thread list and waiting
+ * for signal to wake up.
+ * The thread terminate directly by detach and exit when it is asked to stop
+ * after finishing a task. Otherwise, the thread should be in idle thread list
+ * and should be joined.
+ */
+static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_pool_t *me = param;
+ apr_thread_pool_task_t *task = NULL;
+ apr_interval_time_t wait;
+ struct apr_thread_list_elt *elt;
+
+ apr_thread_mutex_lock(me->lock);
+ elt = elt_new(me, t);
+ if (!elt) {
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_exit(t, APR_ENOMEM);
+ }
+
+ while (!me->terminated && elt->state != TH_STOP) {
+ /* Test if not new element, it is awakened from idle */
+ if (APR_RING_NEXT(elt, link) != elt) {
+ --me->idle_cnt;
+ APR_RING_REMOVE(elt, link);
+ }
+
+ APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
+ task = pop_task(me);
+ while (NULL != task && !me->terminated) {
+ ++me->tasks_run;
+ elt->current_owner = task->owner;
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
+ task->func(t, task->param);
+ apr_thread_mutex_lock(me->lock);
+ APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+ apr_thread_pool_task, link);
+ elt->current_owner = NULL;
+ if (TH_STOP == elt->state) {
+ break;
+ }
+ task = pop_task(me);
+ }
+ assert(NULL == elt->current_owner);
+ if (TH_STOP != elt->state)
+ APR_RING_REMOVE(elt, link);
+
+ /* Test if a busy thread been asked to stop, which is not joinable */
+ if ((me->idle_cnt >= me->idle_max
+ && !(me->scheduled_task_cnt && 0 >= me->idle_max)
+ && !me->idle_wait)
+ || me->terminated || elt->state != TH_RUN) {
+ --me->thd_cnt;
+ if ((TH_PROBATION == elt->state) && me->idle_wait)
+ ++me->thd_timed_out;
+ APR_RING_INSERT_TAIL(me->recycled_thds, elt,
+ apr_thread_list_elt, link);
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_detach(t);
+ apr_thread_exit(t, APR_SUCCESS);
+ return NULL; /* should not be here, safe net */
+ }
+
+ /* busy thread become idle */
+ ++me->idle_cnt;
+ APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
+
+ /*
+ * If there is a scheduled task, always scheduled to perform that task.
+ * Since there is no guarantee that current idle threads are scheduled
+ * for next scheduled task.
+ */
+ if (me->scheduled_task_cnt)
+ wait = waiting_time(me);
+ else if (me->idle_cnt > me->idle_max) {
+ wait = me->idle_wait;
+ elt->state = TH_PROBATION;
+ }
+ else
+ wait = -1;
+
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_mutex_lock(me->cond_lock);
+ if (wait >= 0) {
+ rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
+ }
+ else {
+ rv = apr_thread_cond_wait(me->cond, me->cond_lock);
+ }
+ apr_thread_mutex_unlock(me->cond_lock);
+ apr_thread_mutex_lock(me->lock);
+ }
+
+ /* idle thread been asked to stop, will be joined */
+ --me->thd_cnt;
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_exit(t, APR_SUCCESS);
+ return NULL; /* should not be here, safe net */
+}
+
+static apr_status_t thread_pool_cleanup(void *me)
+{
+ apr_thread_pool_t *_self = me;
+
+ _self->terminated = 1;
+ etch_apr_thread_pool_idle_max_set(_self, 0);
+ while (_self->thd_cnt) {
+ apr_sleep(20 * 1000); /* spin lock with 20 ms */
+ }
+ apr_thread_mutex_destroy(_self->lock);
+ apr_thread_mutex_destroy(_self->cond_lock);
+ apr_thread_cond_destroy(_self->cond);
+ return APR_SUCCESS;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_create(apr_thread_pool_t ** me,
+ apr_size_t init_threads,
+ apr_size_t max_threads,
+ apr_pool_t * pool)
+{
+ apr_thread_t *t;
+ apr_status_t rv = APR_SUCCESS;
+
+ *me = apr_pcalloc(pool, sizeof(**me));
+ if (!*me) {
+ return APR_ENOMEM;
+ }
+
+ (*me)->pool = pool;
+
+ rv = thread_pool_construct(*me, init_threads, max_threads);
+ if (APR_SUCCESS != rv) {
+ *me = NULL;
+ return rv;
+ }
+ apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
+ apr_pool_cleanup_null);
+
+ while (init_threads) {
+ rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
+ if (APR_SUCCESS != rv) {
+ break;
+ }
+ ++(*me)->thd_cnt;
+ if ((*me)->thd_cnt > (*me)->thd_high)
+ (*me)->thd_high = (*me)->thd_cnt;
+ --init_threads;
+ }
+
+ return rv;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_destroy(apr_thread_pool_t * me)
+{
+ return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
+}
+
+
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
+ apr_thread_start_t func,
+ void *param, apr_byte_t priority,
+ void *owner, apr_time_t time)
+{
+ apr_thread_pool_task_t *t;
+
+ if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
+ t = apr_pcalloc(me->pool, sizeof(*t));
+ if (NULL == t) {
+ return NULL;
+ }
+ }
+ else {
+ t = APR_RING_FIRST(me->recycled_tasks);
+ APR_RING_REMOVE(t, link);
+ }
+
+ APR_RING_ELEM_INIT(t, link);
+ t->func = func;
+ t->param = param;
+ t->owner = owner;
+ if (time > 0) {
+ t->dispatch.time = apr_time_now() + time;
+ }
+ else {
+ t->dispatch.priority = priority;
+ }
+ return t;
+}
+
+
+
+/*
+ * Test it the task is the only one within the priority segment.
+ * If it is not, return the first element with same or lower priority.
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
+ apr_thread_pool_task_t * const t)
+{
+ int seg;
+ int next;
+ apr_thread_pool_task_t *t_next;
+
+ seg = TASK_PRIORITY_SEG(t);
+ if (me->task_idx[seg]) {
+ assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+ me->task_idx[seg]);
+ t_next = me->task_idx[seg];
+ while (t_next->dispatch.priority > t->dispatch.priority) {
+ t_next = APR_RING_NEXT(t_next, link);
+ if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
+ t_next) {
+ return t_next;
+ }
+ }
+ return t_next;
+ }
+
+ for (next = seg - 1; next >= 0; next--) {
+ if (me->task_idx[next]) {
+ APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+ break;
+ }
+ }
+ if (0 > next) {
+ APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
+ }
+ me->task_idx[seg] = t;
+ return NULL;
+}
+
+
+/*
+ * schedule a task to run in "time" microseconds. Find the spot in the ring where
+ * the time fits. Adjust the short_time so the thread wakes up when the time is reached.
+ */
+static apr_status_t schedule_task(apr_thread_pool_t *me,
+ apr_thread_start_t func, void *param,
+ void *owner, apr_interval_time_t time)
+{
+ apr_thread_pool_task_t *t;
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_t *thd;
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_mutex_lock(me->lock);
+
+ t = task_new(me, func, param, 0, owner, time);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+ t_loc = APR_RING_FIRST(me->scheduled_tasks);
+ while (NULL != t_loc) {
+ /* if the time is less than the entry insert ahead of it */
+ if (t->dispatch.time < t_loc->dispatch.time) {
+ ++me->scheduled_task_cnt;
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ break;
+ }
+ else {
+ t_loc = APR_RING_NEXT(t_loc, link);
+ if (t_loc ==
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link)) {
+ ++me->scheduled_task_cnt;
+ APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
+ apr_thread_pool_task, link);
+ break;
+ }
+ }
+ }
+ /* there should be at least one thread for scheduled tasks */
+ if (0 == me->thd_cnt) {
+ rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+ if (APR_SUCCESS == rv) {
+ ++me->thd_cnt;
+ if (me->thd_cnt > me->thd_high)
+ me->thd_high = me->thd_cnt;
+ }
+ }
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_signal(me->cond);
+ apr_thread_mutex_unlock(me->cond_lock);
+ return rv;
+}
+
+
+
+static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
+ void *param, apr_byte_t priority, int push,
+ void *owner)
+{
+ apr_thread_pool_task_t *t;
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_t *thd;
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(me->lock);
+
+ t = task_new(me, func, param, priority, owner, 0);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+
+ t_loc = add_if_empty(me, t);
+ if (NULL == t_loc) {
+ goto FINAL_EXIT;
+ }
+
+ if (push) {
+ while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+ t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
+ t_loc = APR_RING_NEXT(t_loc, link);
+ }
+ }
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ if (!push) {
+ if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+ me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+ }
+ }
+
+ FINAL_EXIT:
+ me->task_cnt++;
+ if (me->task_cnt > me->tasks_high)
+ me->tasks_high = me->task_cnt;
+ if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
+ me->task_cnt > me->threshold)) {
+ rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+ if (APR_SUCCESS == rv) {
+ ++me->thd_cnt;
+ if (me->thd_cnt > me->thd_high)
+ me->thd_high = me->thd_cnt;
+ }
+ }
+ apr_thread_mutex_unlock(me->lock);
+
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_signal(me->cond);
+ apr_thread_mutex_unlock(me->cond_lock);
+
+ return rv;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_push(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_byte_t priority,
+ void *owner)
+{
+ return add_task(me, func, param, priority, 1, owner);
+}
+
+
+
+apr_status_t etch_apr_thread_pool_schedule(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_interval_time_t time,
+ void *owner)
+{
+ return schedule_task(me, func, param, owner, time);
+}
+
+
+
+apr_status_t etch_apr_thread_pool_top(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_byte_t priority,
+ void *owner)
+{
+ return add_task(me, func, param, priority, 0, owner);
+}
+
+
+
+static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
+ void *owner)
+{
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_pool_task_t *next;
+
+ t_loc = APR_RING_FIRST(me->scheduled_tasks);
+ while (t_loc !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link)) {
+ next = APR_RING_NEXT(t_loc, link);
+ /* if this is the owner remove it */
+ if (t_loc->owner == owner) {
+ --me->scheduled_task_cnt;
+ APR_RING_REMOVE(t_loc, link);
+ }
+ t_loc = next;
+ }
+ return APR_SUCCESS;
+}
+
+
+
+static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
+{
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_pool_task_t *next;
+ int seg;
+
+ t_loc = APR_RING_FIRST(me->tasks);
+ while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
+ next = APR_RING_NEXT(t_loc, link);
+ if (t_loc->owner == owner) {
+ --me->task_cnt;
+ seg = TASK_PRIORITY_SEG(t_loc);
+ if (t_loc == me->task_idx[seg]) {
+ me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
+ if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+ apr_thread_pool_task,
+ link)
+ || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+ me->task_idx[seg] = NULL;
+ }
+ }
+ APR_RING_REMOVE(t_loc, link);
+ }
+ t_loc = next;
+ }
+ return APR_SUCCESS;
+}
+
+
+
+static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
+{
+#ifndef NDEBUG
+ apr_os_thread_t *os_thread;
+#endif
+ struct apr_thread_list_elt *elt;
+ apr_thread_mutex_lock(me->lock);
+ elt = APR_RING_FIRST(me->busy_thds);
+ while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
+ if (elt->current_owner != owner) {
+ elt = APR_RING_NEXT(elt, link);
+ continue;
+ }
+#ifndef NDEBUG
+ /* make sure the thread is not the one calling tasks_cancel */
+ apr_os_thread_get(&os_thread, elt->thd);
+#ifdef WIN32
+ /* hack for apr win32 bug */
+ assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
+#else
+ assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
+#endif
+#endif
+ while (elt->current_owner == owner) {
+ apr_thread_mutex_unlock(me->lock);
+ apr_sleep(200 * 1000);
+ apr_thread_mutex_lock(me->lock);
+ }
+ elt = APR_RING_FIRST(me->busy_thds);
+ }
+ apr_thread_mutex_unlock(me->lock);
+ return;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
+ void *owner)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(me->lock);
+ if (me->task_cnt > 0) {
+ rv = remove_tasks(me, owner);
+ }
+ if (me->scheduled_task_cnt > 0) {
+ rv = remove_scheduled_tasks(me, owner);
+ }
+ apr_thread_mutex_unlock(me->lock);
+ wait_on_busy_threads(me, owner);
+
+ return rv;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_count(apr_thread_pool_t *me)
+{
+ return me->task_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
+{
+ return me->scheduled_task_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_count(apr_thread_pool_t *me)
+{
+ return me->thd_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_busy_count(apr_thread_pool_t *me)
+{
+ return me->thd_cnt - me->idle_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_count(apr_thread_pool_t *me)
+{
+ return me->idle_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
+{
+ return me->tasks_run;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
+{
+ return me->tasks_high;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
+{
+ return me->thd_high;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
+{
+ return me->thd_timed_out;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
+{
+ return me->idle_max;
+}
+
+
+apr_interval_time_t etch_apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
+{
+ return me->idle_wait;
+}
+
+
+
+/*
+ * This function stop extra idle threads to the cnt.
+ * @return the number of threads stopped
+ * NOTE: There could be busy threads become idle during this function
+ */
+static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
+ apr_size_t *cnt, int idle)
+{
+ struct apr_thread_list *thds;
+ apr_size_t n, n_dbg, i;
+ struct apr_thread_list_elt *head, *tail, *elt;
+
+ apr_thread_mutex_lock(me->lock);
+ if (idle) {
+ thds = me->idle_thds;
+ n = me->idle_cnt;
+ }
+ else {
+ thds = me->busy_thds;
+ n = me->thd_cnt - me->idle_cnt;
+ }
+ if (n <= *cnt) {
+ apr_thread_mutex_unlock(me->lock);
+ *cnt = 0;
+ return NULL;
+ }
+ n -= *cnt;
+
+ head = APR_RING_FIRST(thds);
+ for (i = 0; i < *cnt; i++) {
+ head = APR_RING_NEXT(head, link);
+ }
+ tail = APR_RING_LAST(thds);
+ if (idle) {
+ APR_RING_UNSPLICE(head, tail, link);
+ me->idle_cnt = *cnt;
+ }
+
+ n_dbg = 0;
+ for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+ elt->state = TH_STOP;
+ n_dbg++;
+ }
+ elt->state = TH_STOP;
+ n_dbg++;
+ assert(n == n_dbg);
+ *cnt = n;
+
+ apr_thread_mutex_unlock(me->lock);
+
+ APR_RING_PREV(head, link) = NULL;
+ APR_RING_NEXT(tail, link) = NULL;
+ return head;
+}
+
+
+
+static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+ apr_size_t n_dbg;
+ struct apr_thread_list_elt *elt, *head, *tail;
+ apr_status_t rv;
+
+ elt = trim_threads(me, &cnt, 1);
+
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_broadcast(me->cond);
+ apr_thread_mutex_unlock(me->cond_lock);
+
+ n_dbg = 0;
+ if (NULL != (head = elt)) {
+ while (elt) {
+ tail = elt;
+ apr_thread_join(&rv, elt->thd);
+ elt = APR_RING_NEXT(elt, link);
+ ++n_dbg;
+ }
+ apr_thread_mutex_lock(me->lock);
+ APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
+ apr_thread_list_elt, link);
+ apr_thread_mutex_unlock(me->lock);
+ }
+ assert(cnt == n_dbg);
+
+ return cnt;
+}
+
+
+/* don't join on busy threads for performance reasons, who knows how long will
+ * the task takes to perform
+ */
+static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+ trim_threads(me, &cnt, 0);
+ return cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
+ apr_size_t cnt)
+{
+ me->idle_max = cnt;
+ cnt = trim_idle_threads(me, cnt);
+ return cnt;
+}
+
+
+
+apr_interval_time_t etch_apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
+ apr_interval_time_t timeout)
+{
+ apr_interval_time_t oldtime;
+
+ oldtime = me->idle_wait;
+ me->idle_wait = timeout;
+
+ return oldtime;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
+{
+ return me->thd_max;
+}
+
+
+/*
+ * This function stop extra working threads to the new limit.
+ * NOTE: There could be busy threads become idle during this function
+ */
+apr_size_t etch_apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
+ apr_size_t cnt)
+{
+ unsigned int n;
+
+ me->thd_max = cnt;
+ if (0 == cnt || me->thd_cnt <= cnt) {
+ return 0;
+ }
+
+ n = (unsigned) me->thd_cnt - cnt;
+ if (n >= me->idle_cnt) {
+ trim_busy_threads(me, n - me->idle_cnt);
+ trim_idle_threads(me, 0);
+ }
+ else {
+ trim_idle_threads(me, me->idle_cnt - n);
+ }
+ return n;
+}
+
+
+apr_size_t etch_apr_thread_pool_threshold_get(apr_thread_pool_t *me)
+{
+ return me->threshold;
+}
+
+
+apr_size_t etch_apr_thread_pool_threshold_set(apr_thread_pool_t *me,
+ apr_size_t val)
+{
+ apr_size_t ov;
+
+ ov = me->threshold;
+ me->threshold = val;
+ return ov;
+}
+
+
+apr_status_t etch_apr_thread_pool_task_owner_get(apr_thread_t *thd,
+ void **owner)
+{
+ apr_status_t rv;
+ apr_thread_pool_task_t *task;
+ void *data;
+
+ rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ task = data;
+ if (!task) {
+ *owner = NULL;
+ return APR_BADARG;
+ }
+
+ *owner = task->owner;
+ return APR_SUCCESS;
+}
+
+
+/* vim: set ts=4 sw=4 et cin tw=80: */
Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,130 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_transportint.c
+ * transport interface
+ */
+
+#include "etch_transportint.h"
+#include "etch_sessionint.h"
+#include "etch_global.h"
+
+int etchtransport_def_transport_control (void*, void*, void*);
+int etchtransport_def_transport_notify (void*, void*);
+objmask* etchtransport_def_transport_query (void*, void*);
+
+void* etchtransport_def_get_session(void* thisx);
+void etchtransport_def_set_session(void* thisx, void* session);
+
+
+/**
+ * new_default_transport_interface
+ * return a transport interface populated with defaults for virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_default_transport_interface(void* thisx)
+{
+ i_transport* newi = etch_malloc(sizeof(i_transport), ETCHTYPEB_RAWOBJECT);
+
+ newi->transport_control = etchtransport_def_transport_control;
+ newi->transport_notify = etchtransport_def_transport_notify;
+ newi->transport_query = etchtransport_def_transport_query;
+ newi->get_session = etchtransport_def_get_session;
+ newi->set_session = etchtransport_def_set_session;
+ newi->thisx = thisx;
+ return newi;
+}
+
+
+/**
+ * new_transport_interface
+ * return a transport interface populated with specified virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_transport_interface(void* thisx,
+ etch_transport_control sc, etch_transport_notify sn, etch_transport_query sq)
+{
+ i_transport* newi = new_default_transport_interface(thisx);
+
+ if (sc) newi->transport_control = sc;
+ if (sn) newi->transport_notify = sn;
+ if (sq) newi->transport_query = sq;
+ return newi;
+}
+
+
+/**
+ * new_transport_interface_ex
+ * return a transport interface populated with specified virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_transport_interface_ex(void* thisx,
+ etch_transport_control sc, etch_transport_notify sn, etch_transport_query sq,
+ etch_transport_get_session gs, etch_transport_set_session ss)
+{
+ i_transport* newi = new_transport_interface(thisx, sc, sn, sq);
+
+ if (gs) newi->get_session = gs;
+ if (ss) newi->set_session = ss;
+ return newi;
+}
+
+
+/**
+ * clone_transport()
+ */
+i_transport* clone_transport(void* thisx, const i_transport* thattransport)
+{
+ i_transport* newtransport
+ = thattransport? new_default_transport_interface(thisx): NULL;
+
+ if (newtransport)
+ memcpy(newtransport, thattransport, sizeof(i_transport));
+
+ return newtransport;
+}
+
+
+int etchtransport_def_transport_control (void* obj, void* evt, void* v)
+{
+ return -1;
+}
+
+
+int etchtransport_def_transport_notify (void* obj, void* evt)
+{
+ return -1;
+}
+
+
+objmask* etchtransport_def_transport_query (void* obj, void* query)
+{
+ return NULL;
+}
+
+
+void* etchtransport_def_get_session(void* thisx)
+{
+ return NULL;
+}
+
+
+void etchtransport_def_set_session(void* thisx, void* session)
+{
+}
\ No newline at end of file
Added: incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,381 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * test_allocator.c -- test the debug allocator
+ */
+#include "apr_time.h" /* some apr must be included first */
+#include "etchthread.h"
+#include <tchar.h>
+#include <stdio.h>
+#include <conio.h>
+#include "etchobj.h"
+#include "etch_global.h"
+
+#include "cunit.h"
+#include "basic.h"
+#include "automated.h"
+
+int apr_setup(void);
+int apr_teardown(void);
+int this_setup();
+int this_teardown();
+apr_pool_t* g_apr_mempool;
+const char* pooltag = "etchpool";
+
+
+/* - - - - - - - - - - - - - -
+ * unit test infrastructure
+ * - - - - - - - - - - - - - -
+ */
+
+int init_suite(void)
+{
+ apr_setup();
+ etch_runtime_init(TRUE);
+ return this_setup();
+}
+
+int clean_suite(void)
+{
+ this_teardown();
+ etch_runtime_cleanup(0,0); /* free memtable and cache etc */
+ apr_teardown();
+ return 0;
+}
+
+int g_is_automated_test, g_bytes_allocated;
+
+#define IS_DEBUG_CONSOLE FALSE
+
+/*
+ * apr_setup()
+ * establish apache portable runtime environment
+ */
+int apr_setup(void)
+{
+ int result = apr_initialize();
+ if (result == 0)
+ { result = etch_apr_init();
+ g_apr_mempool = etch_apr_mempool;
+ }
+ if (g_apr_mempool)
+ apr_pool_tag(g_apr_mempool, pooltag);
+ else result = -1;
+ return result;
+}
+
+/*
+ * apr_teardown()
+ * free apache portable runtime environment
+ */
+int apr_teardown(void)
+{
+ if (g_apr_mempool)
+ apr_pool_destroy(g_apr_mempool);
+ g_apr_mempool = NULL;
+ apr_terminate();
+ return 0;
+}
+
+int this_setup()
+{
+ etch_apr_mempool = g_apr_mempool;
+ return 0;
+}
+
+int this_teardown()
+{
+ return 0;
+}
+
+
+#define PAYLOADSIG 0xf00f00
+#define NUMITEMS 3
+
+typedef struct payload
+{
+ int count;
+ int signature;
+} payload;
+
+
+/*
+ * This subtest test our ability to build a hashtable each key of which is a
+ * pointer to that key's value, in other words, the hashtable stores a void**
+ * as they key. This essentially models a debug allocator.
+ */
+void malloc_test(void)
+{
+ /* These are what is stored in the hash table for each hash, two pointers.
+ To clarify, *pointers* are what is copied into the hashtable, not values.
+ So it follows that if I want to use a pointer as a hash key, what I must
+ store in the hashtable as the key is a pointer to that pointer.
+ */
+ char **pkey = 0; void *pval = 0;
+
+ payload** allocations[NUMITEMS];
+ int i, testresult = 0, count = 0;
+ etch_hashtable* myhash;
+ etch_hashitem hashbucket;
+ etch_hashitem* myentry = &hashbucket;
+
+ is_memtable_instance = TRUE;
+ myhash = new_hashtable(16); /* create hash table */
+ is_memtable_instance = FALSE;
+ CU_ASSERT_PTR_NOT_NULL_FATAL(myhash);
+
+ /* insert NUMITEMS allocations into the tracking table
+ */
+ for(i = 0; i < NUMITEMS; i++)
+ {
+ int result = 0;
+ void* pkey = 0;
+ payload* pval = 0;
+
+ pkey = malloc(sizeof(void*)); /* allocate the key* */
+ pval = malloc(sizeof(payload)); /* allocate the value */
+ pval->count = i; pval->signature = PAYLOADSIG; /* debug info */
+ memcpy(pkey, &pval, sizeof(void*)); /* copy value* into key */
+ allocations[i] = pkey ; /* save for iterating */
+
+ result = myhash->vtab->insert(myhash->realtable, pkey, sizeof(void*), pval, sizeof(payload), 0, 0);
+ CU_ASSERT_EQUAL(result,0);
+ }
+
+ /* ensure there are NUMITEMS entries in the tracking table*/
+ count = myhash->vtab->count(myhash->realtable, 0, 0);
+ CU_ASSERT_EQUAL_FATAL(count,NUMITEMS);
+
+ for(i = 0; i < NUMITEMS; i++)
+ {
+ int result = 0;
+ payload* pval = 0;
+ payload** pkey = (payload**) &allocations[i];
+
+ result = myhash->vtab->find(myhash->realtable, *pkey, sizeof(void*), NULL, &myentry);
+ CU_ASSERT_EQUAL(result,0);
+
+ pval = (payload*)myentry->value;
+ CU_ASSERT_EQUAL(pval->signature, PAYLOADSIG);
+ CU_ASSERT_EQUAL(pval->count, i);
+ }
+
+ /*
+ * Clear the hashtable, while doing so ask hashtable to free memory at the
+ * and value pointers. Since we have individually allocated both keys (4 bytes
+ * each, each a void**), and values (sizeof(payload) bytes each), we can do so.
+ * If we do not crash, this mini-test succeeds.
+ * CUnit note: there is no easy way to test if this succeeds, since the hashtable
+ * no longer exists if successful, and an attempt to reference it would elicit
+ * an exception.
+ */
+ myhash->vtab->clear(myhash->realtable, TRUE, TRUE, 0, 0);
+
+ g_bytes_allocated = etch_showmem(0,0); /* verify all memory freed */
+ CU_ASSERT_EQUAL(g_bytes_allocated, 0);
+ memtable_clear(); /* start fresh for next test */
+}
+
+
+/*
+ * This subtest tests our implemented debug allocator.
+ */
+void etch_malloc_test(void)
+{
+ /* These are what is stored in the hash table for each hash, two pointers.
+ To clarify, *pointers* are what is copied into the hashtable, not values.
+ So it follows that if I want to use a pointer as a hash key, what I must
+ store in the hashtable as the key is a pointer to that pointer.
+ */
+
+ payload* allocations[NUMITEMS];
+ int i = 0, testresult = 0, count = 0;
+ etch_hashitem hashbucket;
+ etch_hashitem* myentry = &hashbucket;
+
+ /* insert NUMITEMS allocations into the tracking table
+ */
+ for(i = 0; i < NUMITEMS; i++)
+ {
+ void* pmem = 0;
+ payload* pval = 0;
+
+ pval = etch_malloc(sizeof(payload), 0); /* allocate value, get key */
+ CU_ASSERT_PTR_NOT_NULL_FATAL(pval);
+
+ pval->count = i; pval->signature = PAYLOADSIG; /* debug info */
+ allocations[i] = pval ; /* save for iterating */
+ }
+
+ /* ensure there are NUMITEMS entries in the tracking table */
+ CU_ASSERT_PTR_NOT_NULL_FATAL(memtable);
+ count = memtable->vtab->count(memtable->realtable, 0, 0);
+ CU_ASSERT_EQUAL_FATAL(count, NUMITEMS);
+
+ for(i = 0; i < NUMITEMS; i++)
+ {
+ int n, result;
+ payload* pval = (payload*) allocations[i];
+ n = pval->count;
+
+ result = etch_free(pval);
+ CU_ASSERT_EQUAL(result, 0);
+ }
+
+ /* ensure there are zero entries in the tracking table */
+ CU_ASSERT_PTR_NOT_NULL_FATAL(memtable);
+ CU_ASSERT_EQUAL_FATAL(memtable->vtab->count(memtable->realtable, 0, 0), 0);
+
+ g_bytes_allocated = etch_showmem(0,0); /* verify all memory freed */
+ CU_ASSERT_EQUAL(g_bytes_allocated, 0);
+ memtable_clear(); /* start fresh for next test */
+}
+
+
+/*
+ * This subtest tests that the debug allocator reports un-freed memory as expected.
+ */
+void negative_etch_malloc_test(void)
+{
+ struct
+ { double d;
+ } EIGHTBYTES;
+ struct
+ { char x[496];
+ int n;
+ } FIVEHUNDREDBYTES;
+
+ void *p1=0, *p2=0;
+ int bytes_allocated = 0;
+ memtable = NULL; /* ensure not dangling from a prior test */
+
+ p1 = etch_malloc(sizeof(EIGHTBYTES), 0);
+ p2 = etch_malloc(sizeof(FIVEHUNDREDBYTES), 0);
+
+ bytes_allocated = etch_showmem(0,0); /* get leaks - don't free them */
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES) + sizeof(FIVEHUNDREDBYTES));
+
+ etch_free(p2);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+
+ etch_free(p1);
+
+ g_bytes_allocated = etch_showmem(0,0); /* verify all memory freed */
+ CU_ASSERT_EQUAL(g_bytes_allocated, 0);
+ memtable_clear(); /* start fresh for next test */
+}
+
+
+/*
+ * This subtest tests that the debug allocator reports un-freed memory as expected.
+ */
+void etch_realloc_test(void)
+{
+ struct
+ { double d;
+ } EIGHTBYTES;
+ struct
+ { char x[496];
+ int n;
+ } FIVEHUNDREDBYTES;
+
+ void *p1=0, *p2=0;
+ int bytes_allocated = 0;
+
+ /* same as malloc */
+ p1 = etch_realloc(NULL, sizeof(EIGHTBYTES), 0);
+ CU_ASSERT_PTR_NOT_NULL(p1);
+ bytes_allocated = etch_showmem(0,0); /* get leaks - don't free them */
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+
+ /* same as free */
+ p2 = etch_realloc(p1, 0, 0);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, 0);
+
+ /* now expand the memory */
+ p1 = etch_realloc(NULL, sizeof(EIGHTBYTES), 0);
+ CU_ASSERT_PTR_NOT_NULL(p1);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+ *((long *) p1) = 1234;
+
+ p2 = etch_realloc(p1, sizeof(FIVEHUNDREDBYTES), 0);
+
+ CU_ASSERT_PTR_NOT_NULL(p2);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(FIVEHUNDREDBYTES));
+ CU_ASSERT( (*( (long *) p2) ) == 1234 );
+
+ etch_free(p2);
+
+
+ /* now shrink memory */
+ p1 = etch_realloc(NULL, sizeof(FIVEHUNDREDBYTES), 0);
+ CU_ASSERT_PTR_NOT_NULL(p1);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(FIVEHUNDREDBYTES));
+ *((long *) p1) = 1234;
+
+ p2 = etch_realloc(p1, sizeof(EIGHTBYTES), 0);
+
+ CU_ASSERT_PTR_NOT_NULL(p2);
+ bytes_allocated = etch_showmem(0,0);
+ CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+ CU_ASSERT( (*( (long *) p2) ) == 1234 );
+
+ etch_free(p2);
+
+ g_bytes_allocated = etch_showmem(0,0); /* verify all memory freed */
+ CU_ASSERT_EQUAL(g_bytes_allocated, 0);
+ memtable_clear(); /* start fresh for next test */
+}
+
+
+/**
+ * main
+ */
+int _tmain(int argc, _TCHAR* argv[])
+{
+ char c=0;
+ CU_pSuite pSuite = NULL;
+ g_is_automated_test = argc > 1 && 0 != wcscmp(argv[1], L"-a");
+ if (CUE_SUCCESS != CU_initialize_registry()) return CU_get_error();
+
+ CU_set_output_filename("../test_allocator");
+ pSuite = CU_add_suite("suite_allocator", init_suite, clean_suite);
+
+ CU_add_test(pSuite, "hashtable pointer key test", malloc_test);
+ CU_add_test(pSuite, "debug realloc test", etch_realloc_test);
+ CU_add_test(pSuite, "debug allocator test", etch_malloc_test);
+ CU_add_test(pSuite, "negative debug allocator test", negative_etch_malloc_test);
+
+ if (g_is_automated_test)
+ CU_automated_run_tests();
+ else
+ { CU_basic_set_mode(CU_BRM_VERBOSE);
+ CU_basic_run_tests();
+ }
+
+ if (!g_is_automated_test) { printf("any key ..."); while(!c) c = _getch(); wprintf(L"\n"); }
+ CU_cleanup_registry();
+ return CU_get_error();
+}
+