You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by jd...@apache.org on 2009/04/22 19:25:51 UTC

svn commit: r767594 [21/43] - in /incubator/etch/trunk/binding-c/runtime/c: ./ ext/ ext/hashtab/ ext/lib/ inc/ lib/ project/ project/$etchstop/ project/bin/ project/etch/ project/logcli/ project/logsrv/ project/notes/ project/test/ project/test/logcli/...

Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_stub.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,876 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_stub.c
+ * contains the base runtime code for either type of stub.
+ *
+ * the generated stub_xxxx_server will define the thread procedures for each message
+ * type in the service. for each such type, it will set the virtual stub helpers in 
+ * the type objects, to point to these thread functions. each such thread procedure  
+ * will call the user-coded calculation in the impl_xxxx_server for that message  
+ * type; for example, for an "add" message, it will get the fields from the service
+ * value factory which are the arguments to the calculation, call the calculation in
+ * the server impl, insert the result of the calculation into the reply message, and
+ * call the delivery service transport_message to send the reply back to the sender.  
+ */
+
+#include "apr_network_io.h"
+#include "apr_general.h"
+
+#include "etch_svcobj_masks.h"
+#include "etch_stub.h"
+#include "etch_global.h"
+#include "etchexcp.h"    
+#include "etchlog.h"
+
+int etchstub_session_notify_ex (void*, void*, etch_event*);
+int etchstub_session_message(etch_stub*, etch_who*, etch_message*);
+int etchstub_session_control(etch_stub*, etch_event*, objmask*);
+int etchstub_session_notify (etch_stub*, etch_event*);
+objmask* etchstub_session_query (etch_stub*, objmask*); 
+i_objsession* etchstub_get_session_callbacks_from (void* _obj);
+
+char* ETCHSTUB = "STUB";
+char* scstr = "session control", *snstr = "session notify", *sqstr = "session query";
+char* logmask1 = "%s not routed to client\n", *logmask2 = "%s not registered\n";
+ 
+
+
+#if(0)
+____________________________________________________________________________________
+the inheritance hierarchy becomes indefinite at this point. a stub should be able
+to query a service object for a particular interface, down through its inheritance
+chain. for example, we need to find who has implemented obj_session and call methods
+on it. a use case is a custom transport implemented as a dynamically loaded dll.
+the stub needs therefore to be able to call into objects it has never seen before. 
+
+for the present we will make some assumptions as to the inheritance hierarchy, that
+being that given a service interface, we know its inheritance hierarchy. following
+is a diagram illustrating the inheritance of a service interface named XXXX. 
+all arrows direct down unless indicated by an arrowhead. (e) indicates extends,
+(i) indicates implements. this diagram is upside-down to the way we normally see
+the hierarchy.
+                                     (obj)
+                                       |
+                                     XXXX  <- - - - - - - - - - (stub)
+                                     |  |
+                       (e)  ---------    ----------  (e)
+                           |                       |
+                      XXXX_server            remote_XXXX
+                           |      \ (i)            | (e)
+                       (i) |        ------> remote_XXXX_server             
+  obj_session <--  base_XXXX_server      
+                       (e) |
+                   impl_XXXX_server
+____________________________________________________________________________________
+#endif
+
+
+#if(0)
+
+IMPL_XXXX_SERVER
+   - ctor(REMOTE_XXXX_CLIENT)
+   - usermethod_1_impl(); ...
+   - usermethod_n_impl();
+
+   BASE_XXXX_SERVER
+      - usermethod_1_stub(); ...
+      - usermethod_n_stub();
+      OBJSESSION
+      - _session_control; _session_notify; _session_query; (stubs)
+      XXXX_SERVER
+      - any server-directed methods go here
+         XXXX
+         - see user method impls above
+
+
+STUBXXXX<T extends XXXX> extends STUBBASE<T>
+   - ctor(DeliveryService, pool, pool);
+   - stubhelper();
+
+   STUBBASE<T> (DeliveryService, pool, pool)
+      T obj;
+      DeliveryService, Pool, Pool;
+      stub_helper();  
+
+      STUBPOOLRUNNABLE
+      - set(stub_helper, who, msg); (set state, not in java)
+      - stubhelper(stub, delsvc, obj, who, msg);  
+
+      SESSIONMESSAGE
+      - session_message(who, message); 
+        { stubhelper = message.type.get_helper() 
+          stubhelper(delsvc, obj, who, msg); // the stubhelper impl calls threadpool run()
+        }
+      - session_control; session_notify; session_query;  
+
+
+STUBXXXSERVER  (generated, hand-codable)
+   - ctor(DeliveryService, XXXXSERVER, pool, pool);
+   - for each type in the vf, 
+        -- implement a run method (threadproc) for the message type 
+        -- implement a stub helper method which calls the threadpool run using the threadproc above
+    
+   STUBXXXX
+
+#endif
+
+
+/* - - - - - - - - - - - - - - - - - -  
+ * stub base method implementations
+ * - - - - - - - - - - - - - - - - - -  
+ */ 
+
+/**
+ * destroy_stub()
+ * etch_stub destructor. destroy stub base object.
+ * the stub base does not destroy its xxxx_either_stub wrapper.
+ */
+int destroy_stub(etch_stub* thisx)
+{
+    if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;  
+
+    if (!is_etchobj_static_content(thisx))
+    {
+        /* note that a delivery service session is the stub's session.
+         * the delivery service owns the stub, and the stub owns this shared
+         * session. the stub wrapper is the session interface's thisx.
+         * the delivery service gets xxxx_either_stub* stubbobj from its  
+         * isessionmsg->thisx, and etch_stub* stubbase from stubobj.stub_base.
+         * destroying the stub will destroy the shared i_sessionmessage, so 
+         * the delivery service's session will become invalid once its stub 
+         * is destroyed. see transport.destroy_delivery_service_stub().
+         */
+        if (thisx->isessionmsg) 
+            thisx->isessionmsg->destroy(thisx->isessionmsg);
+
+        if (thisx->obj && thisx->is_implobj_owned)
+            thisx->obj->destroy(thisx->obj);
+    }
+
+    return destroy_objectex((objmask*) thisx);
+}
+
+
+/**
+ * new_stub()
+ * etch_stub (stub base) constructor.  
+ */
+etch_stub* new_stub (void* implobj, unsigned char stubtype, 
+   i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp) 
+{
+    i_sessionmessage* ism = NULL;
+
+    etch_stub* stubbase = (etch_stub*) new_object  
+       (sizeof(etch_stub), ETCHTYPEB_STUB, CLASSID_STUB);
+
+    ETCH_ASSERT(implobj && stubtype && qp && fp);
+    ETCH_ASSERT(is_etch_ideliverysvc(ids));
+
+    stubbase->stub_type = stubtype;
+    stubbase->destroy   = destroy_stub; 
+
+    stubbase->obj = implobj; /* server or client impl e.g. i_xxxx_server* */
+    stubbase->delivery_service  = ids;
+
+    /* instantiate i_sessionmessage session interface. note that the interface's
+     * thisx pointer is this base stub object, and that the base stub's container
+     * xxxx_either_stub* is the base stub.stubobj.
+     */
+    ism = new_sessionmsg_interface (stubbase, etchstub_session_message, NULL); 
+    stubbase->isessionmsg = ism;
+    stubbase->session_message = ism->session_message = etchstub_session_message;
+    stubbase->session_control = ism->session_control = etchstub_session_control;
+    stubbase->session_notify  = ism->session_notify  = etchstub_session_notify;
+    stubbase->session_query   = ism->session_query   = etchstub_session_query;
+
+    /* set delivery service session interface to be the stub's i_sessionmessage.
+     * in a c binding translation from java binding, a reference to a stub object 
+     * is only via via the delivery service's session_message interface, shared
+     * between the stub and the delivery service.  
+     * the delivery service must have provided i_transportmessage implementations,
+     * in particular for set_session(). the instantiator of the delivery service
+     * must therefore override the delivery service's i_transportmessage.
+
+    /* fyi: ids->itm->thisx is mailbox manager.
+     * ids->itm is mboxmgr's transport interface, which is the delivery service.
+     * set session of next lower level (delivery service) to the stub's session.
+     */
+    ids->itm->set_session (ids, stubbase->isessionmsg);   
+
+    /* copy impl's i_objsession to the stub for convenience */
+    stubbase->impl_callbacks = etchstub_get_session_callbacks_from (implobj); 
+ 
+    return stubbase;
+}
+
+ 
+/**
+ * etchstub_get_session_callbacks_from()
+ * extract objsession interface from specified stub implementor object.
+ */
+i_objsession* etchstub_get_session_callbacks_from (void* _obj)
+{
+    i_objsession* iobjsession = NULL;
+    objmask*  obj = (objmask*) _obj; 
+    const int this_objtype = obj? obj->obj_type: 0;
+
+    switch(this_objtype)
+    {
+        case ETCHTYPEB_EXESERVERIMPL:
+        {    xxxx_server_impl* server_impl = (xxxx_server_impl*) obj;
+             iobjsession = server_impl->iobjsession;
+             break;
+        }
+
+        case ETCHTYPEB_EXECLIENTIMPL:
+        {    xxxx_client_impl* client_impl = (xxxx_client_impl*) obj;
+             iobjsession = client_impl->iobjsession;
+             break;
+        }
+
+        case ETCHTYPEB_EXESERVERBASE:
+        {   i_xxxx_server* iserver = (i_xxxx_server*) obj;
+            xxxx_server_impl* server_impl = (xxxx_server_impl*) iserver->thisx;
+            ETCH_ASSERT(server_impl && server_impl->obj_type == ETCHTYPEB_EXESERVERIMPL);
+            iobjsession = server_impl->iobjsession;
+            break;
+        }
+
+        case ETCHTYPEB_EXECLIENTBASE:
+        {   i_xxxx_client* iclient = (i_xxxx_client*) obj;
+            xxxx_client_impl* client_impl = (xxxx_client_impl*) iclient->thisx;
+            ETCH_ASSERT(client_impl && client_impl->obj_type == ETCHTYPEB_EXECLIENTIMPL);
+            iobjsession = client_impl->iobjsession;
+            break;
+        }
+    }
+
+    return iobjsession;
+}
+
+
+/**
+ * etchstub_put_exception
+ * put serializable exception to reply message.
+ * @param replymsg the message, caller retains.
+ * @param fromobj the etch object or etch_exception wrapping the etchexception,
+ * caller retains.
+ */
+int etchstub_put_exception (etch_stub* stub, etch_message* replymsg, objmask* fromobj)
+{
+    int result = 0;
+    etchlog(ETCHSTUB, ETCHLOG_INFO, "throwing remote exception\n"); 
+
+    /* insert serializable exception to reply message */
+    result = message_throw_from (replymsg, fromobj);  
+
+    /* notify the session with a copy of the exception, relinquished */
+    stub->session_notify (stub, new_etch_exception_from (fromobj));
+    return result;
+}
+
+ 
+/**
+ * etchstub_put_resultobj
+ * insert specified result object to message.
+ * @param replymsg the message, caller retains.
+ * @param resultobj the result object, caller relinquishes regardless of result.
+ * @return 0 success, -1 failure. 
+ */
+int etchstub_put_resultobj (etch_stub* stub, etch_message* replymsg, objmask* resultobj)
+{
+    int result = 0;
+    etch_field* key_result = builtins._mf_result;
+
+    /* check if the message already has a result object, i.e., an exception */
+    if (message_get (replymsg, key_result))
+        resultobj->destroy(resultobj);  /* if so, discard the passed result */
+    else  /* resultobj is relinquished here regardless of result */
+        result = message_put (replymsg, clone_field(key_result), (objmask*) resultobj); 
+
+    return result;
+}
+
+ 
+/**
+ * etchstub_send_reply
+ * private method invoked by all "stub helpers" (message logic implementation 
+ * runners) for messages requiring a reply, in order to instantiate a reply
+ * message and transmit it back to sender.
+ */
+int etchstub_send_reply (etch_stub* stub, i_delivery_service* dsvc, 
+    etch_who* whofrom, etch_message* msg, objmask* resultobj)
+{
+    int result = 0; 
+    const int is_exception_resobj  = is_exception(resultobj);
+    const int is_exception_message = is_exception(msg);
+    const int is_exception_result  = is_exception_resobj || is_exception_message;
+    etch_type* newtype = NULL; /* see comment following */
+
+    /* instantiate the reply message  
+     * note that if we are called for a one-way message, it is only if an 
+     * exception was thrown by the implementation, and the exception is 
+     * therefore to be the reply. in such a case, the etch compiler generates
+     * a message type as newtype above, which is the second parameter to 
+     * message_reply(), in order that in_reply_to can be instantiated. 
+     * (our example below makes newtype the 1-way exception reply type in  
+     * this case). for two-way messages, newtype, and thus the second parameter
+     * to message_reply() is null.  
+     */
+    etch_message* replymsg = message_reply (msg, newtype);
+
+    if (NULL == replymsg && is_exception_result)
+    {   newtype  = builtins._mt__exception;
+        replymsg = message_reply (msg, newtype);
+    }
+
+    if (NULL == replymsg)
+    {   etch_type* replytype = newtype? newtype: message_type(msg);
+        char* logmask = "could not create reply message for type %s\n";
+        etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, replytype->aname);
+        return -1;
+    }
+
+    if (is_exception_resobj)  /* the result wraps, or is, an exception */
+        etchstub_put_exception (stub,  replymsg, resultobj);
+    else
+    if (is_exception_message) /* the message wraps an exception */
+        etchstub_put_exception (stub,  replymsg, (objmask*) msg);
+
+    /* insert result object to message. resultobj is relinquished here. 
+     * note that if message_put() or transport_message() were to fail, 
+     * the other side would not receive a result, and would be aware 
+     * only via timeout or via the peer connection dropping. 
+     */
+    result = etchstub_put_resultobj (stub, replymsg, resultobj);
+
+    if (0 == result)  /* forward reply message for serialization */
+        result = dsvc->itm->transport_message (dsvc, whofrom, replymsg);
+
+    return result;
+}
+
+
+/**
+ * etchstub_validate_args
+ * private method invoked by all "stub helpers" to validate parameters
+ * to the individual stub helper and populate some objects for the helper.
+ */
+int etchstub_validate_args (etch_stub* stub, i_delivery_service* dsvc, 
+    etch_message* msg, void* client, default_value_factory** vf, 
+    void** vfimpl, void** climpl)
+{
+    int result = 0;
+    i_xxxx_client* iclient = NULL;
+    objmask* clientimpl = NULL, *valufactimpl = NULL;
+    /* any assertion failure in this method indicates a coding error in etch core */
+    ETCH_ASSERT(stub && dsvc && client && msg && stub->params && vf && vfimpl && climpl);  
+
+    iclient = (i_xxxx_client*) client;
+    clientimpl = iclient->thisx;  
+    ETCH_ASSERT(is_etch_serverimpl(clientimpl));
+    *climpl = clientimpl;
+
+    *vf = (default_value_factory*) stub->params->in_valufact;
+    ETCH_ASSERT(is_etch_valuefact(*vf));
+    valufactimpl = (*vf)->impl;
+    ETCH_ASSERT(is_etch_valuefactimpl(valufactimpl));
+    *vfimpl = valufactimpl;
+
+    return result;
+}
+
+
+/**
+ * etchstub_session_notify_ex()
+ *
+ * @param thisx the object which will receive an exception if any.
+ * may be the same object as _obj. caller retains. 
+ *
+ * @param _obj caller retains. usually a xxxx_server_impl. may be null, or
+ * may be the same object as thisx. obj will belong to a known and limited set  
+ * of classes, e.g. ETCHTYPEB_EXESERVERIMPL, so we can test _obj.obj_type 
+ * if needed, and cast _obj to one of the mask objects (etch_svcobj_masks.h) 
+ * such as xxxx_server_impl, in order to reference the maskable content.
+ * 
+ * @param evt a notification event or a throwable exception, caller relinquishes.
+ *
+ * @return 0 success, -1 failure.
+ */
+int etchstub_session_notify_ex (void* thisx, void* _obj, etch_event* evt)     
+{
+    int result = -1, is_evtparm_relinquished = FALSE;
+
+    /* this call arrives from type "stub helpers" implemented in xxxx_server_stub.
+     * these are thread procedures with arguments delivery service, some object
+     * _obj (possibly implementing objsession or throwable), who, and message, 
+     * viz:  int (*stubhelper) (stub, deliverysvc, _obj, sender, message);  
+     * so from argument _obj, we extract the objsession interface and if present,    
+     * call its _session_notify. 
+     */
+
+    /* a server implementation (xxxx_server_impl) can request notifications of 
+     * exceptions and the like by implementing and registering i_objsession 
+     * callbacks. the presence of these function pointers in the implementation
+     * serves as the indicator of whether to forward the notification.
+     */
+    i_objsession* impl_callbacks = etchstub_get_session_callbacks_from (_obj);  
+
+    etch_session_notify impl_session_notify = impl_callbacks? 
+        impl_callbacks->_session_notify: NULL;
+
+    if (impl_session_notify) /* if impl has requested this event, forward it */
+    {   /* event is relinquished to notify handlers by contract */
+        result = impl_session_notify (impl_callbacks, evt);
+        is_evtparm_relinquished = TRUE;  
+    }
+
+    if (0 != result)
+    {   char* logmask = impl_session_notify? logmask1: logmask2;
+        etchlog(ETCHSTUB, ETCHLOG_DEBUG, logmask, snstr);
+    }
+
+    if (!is_evtparm_relinquished)  
+        ETCHOBJ_DESTROY(evt);
+
+    return result;
+}
+
+
+/* - - - - - - - - - - - - - - - - - - - - -
+ * stub helper threadpool execution support
+ * - - - - - - - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etchstub_loghelper()
+ * convenience method to log entry or exit of a stub helper function.
+ */
+void etchstub_loghelper (opaque_stubhelper f, 
+    const int result, const int thread_id, const int is_start)
+{
+    if (config.loglevel <= ETCHLOG_DEBUG)  
+    {   char onthread[32];
+        if  (thread_id) 
+             sprintf(onthread, "on thread %d", thread_id);
+        else *onthread = '\0';
+
+        if (is_start)
+             etchlog(ETCHSTUB, ETCHLOG_DEBUG, "start stub runner %x %s\n", 
+                f, onthread); 
+        else etchlog(ETCHSTUB, ETCHLOG_DEBUG, "exit stub runner %x (%d) %s\n", 
+                f, result, onthread); 
+    }
+}
+
+
+/**
+ * etchstub_proxy_threadproc()
+ * a thread procedure conforming to the signature expected by etch_threadpool,
+ * which accepts a stub parameter bundle as params.data, unbundles the parameters, 
+ * and invokes the included stub helper function with those parameters.
+ */
+void etchstub_proxy_threadproc (etch_threadparams* tp)
+{
+    int result = 0;
+    etchstub_runparams* p = tp? tp->data: NULL;
+    ETCH_ASSERT(p && p->run && (p->sig == ETCH_STUBPARAMS_SIGNATURE));
+    etchstub_loghelper (p->run, 0, tp->etch_thread_id, 1);
+
+    /* run "stub helper" procedure */
+    result = p->run (p->stub, p->ds, p->server, p->who, p->msg); 
+
+    etchstub_loghelper (p->run, result, tp->etch_thread_id, 0); 
+    /* etch_threadparams is marked as owning data so we don't destroy p here */
+}
+
+
+/**
+ * new_etch_stubparams()
+ * bundle up stub runner parameters in order they can become 
+ * etch_threadparams.data, passed in to the etchstub_proxy_threadproc. 
+ */
+etchstub_runparams* new_etch_stubparams (etch_stub* stub, opaque_stubhelper runproc, 
+    i_delivery_service* ds, i_xxxx_server* server, etch_who* whofrom, etch_message* msg)
+{
+    etchstub_runparams* p = etch_malloc(sizeof(etchstub_runparams), 0); 
+    p->sig  = ETCH_STUBPARAMS_SIGNATURE;
+    p->stub = stub;
+    p->run  = runproc;
+    p->ds   = ds;
+    p->who  = whofrom;
+    p->msg  = msg;
+    p->server = server;
+    return p;
+}
+
+
+/**
+ * etchstub_run()
+ * execute the service method helper function (stub helper). if no threadpool
+ * is specified, execute the helper function inline (on caller's thread);  
+ * otherwise wrap the stub helper arguments and launch a thread on the supplied  
+ * threadpool to execute the helper function.
+ * @param stub caller retains.
+ * @param runproc the service function for this message type. 
+ * @param threadpool caller retains.
+ * @param server caller retains.
+ * @param who caller retains.
+ * @param msg caller retains.
+ * @return 0 success, -1 failure.
+ */
+int etchstub_run (etch_stub* stub, opaque_stubhelper runproc, etch_threadpool* threadpool, 
+    i_xxxx_server* server, etch_who* who, etch_message* msg)
+{
+    int result = 0;
+    i_delivery_service* ds = stub->delivery_service;
+    ETCH_ASSERT(runproc && ds && server && msg);
+
+    if (threadpool)
+    {   
+        /* a pointer to this parameter bundle will become etch_threadparams.data. 
+         * etch_threadparams.is_own_data determines if the thread frees data on exit.
+         * is_own_data assumes the value of threadpool.is_free_data, which is true
+         * by default. so unless we have reset is_free_data on this pool, the pool  
+         * thread will free this allocation at thread exit. and since also by default
+         * pool->is_data_etchobject is false, the deallocation will use etch_free;
+         */
+        etchstub_runparams* p = new_etch_stubparams(stub, runproc, ds, server, who, msg); 
+        
+        etch_thread* poolthread  /* run stub helper function on a pool thread */
+           = threadpool->run (threadpool, etchstub_proxy_threadproc, p);
+
+        if  (poolthread) /* switch context and block on pool thread exit */
+        {    const int id = poolthread->params.etch_thread_id;
+             etchlog(ETCHSTUB, ETCHLOG_XDEBUG, "stub runner joining pool thread %d ...\n", id);
+             etch_join (poolthread);  
+        }
+        else result = -1;
+    }
+    else  
+    {   etchstub_loghelper(runproc, 0, 0, 1);
+
+        /* run stub helper function on this thread */
+        result = runproc (stub, ds, server, who, msg); 
+ 
+        etchstub_loghelper(runproc, result, 0, 0); 
+    }
+
+    return result;
+}
+
+
+/* - - - - - - - - - -
+ * i_sessionmessage
+ * - - - - - - - - - - 
+ */
+
+/**
+ * etchstub_session_message()
+ * @param whofrom caller retains, can be null.
+ * @param msg caller relinquishes
+ * @return 0 (message handled), or -1 (error, closed, or timeout)  
+ */
+int etchstub_session_message (etch_stub* thisx, etch_who* whofrom, etch_message* msg)
+{
+    xxxx_either_stub* stubimpl   = (xxxx_either_stub*) thisx->stubobj;
+    i_xxxx_server*  serverimpl   = NULL;
+    etch_session* sessionparams  = NULL;
+    opaque_stubhelper stubhelper = NULL;
+    etch_threadpool*  threadpool = NULL;
+    int result = -1, session_id = 0;
+    unsigned char async_mode = 0;
+
+    /* get the message type object associated with this message */
+    etch_type*  thistype = message_type(msg);
+    ETCH_ASSERT(thistype);
+    ETCH_ASSERT(is_etch_stub(stubimpl));
+    session_id = stubimpl->owner_id;
+
+    do
+    {   get_etch_session (thisx->params, session_id, &sessionparams);
+        ETCH_ASSERT (sessionparams);
+        serverimpl = sessionparams->server;
+
+        /* get the message type's stub helper (runner) function.
+         * the stub helper is a pointer to a thread procedure function specific    
+         * to the message type, which executes the associated service method.
+         */
+        if (NULL == (stubhelper = etchtype_get_type_stubhelper (thistype)))
+        {   char* msgmask = "type '%s' missing stub runner procedure\n";
+            etchlog(ETCHSTUB, ETCHLOG_ERROR, msgmask, thistype->aname);
+            break;
+        }
+
+        /* get the thread pool, if any, appropriate for the 
+         * configured mode of execution for this message type 
+         */
+        switch(async_mode = etchtype_get_async_mode (thistype))
+        {   case ETCH_ASYNCMODE_QUEUED: threadpool = thisx->params->qpool; break;
+            case ETCH_ASYNCMODE_FREE:   threadpool = thisx->params->fpool; break;
+            default: threadpool = NULL;
+        }
+
+        /* execute the service method execution function (stub helper),  
+         * either on a thread or inline. 
+         */
+        result = etchstub_run (thisx, stubhelper, threadpool, serverimpl, whofrom, msg);
+
+    } while(0);
+
+    /* this is the end of the line for the message */
+    msg->destroy(msg); 
+
+    return result;
+}
+
+
+/**
+ * etchstub_session_control()
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etchstub_session_control (etch_stub* thisx, etch_event* control, objmask* value)
+{
+    int result = -1, is_params_relinquished = FALSE;
+
+    /* a server implementation (xxxx_server_impl) can request notifications of 
+     * exceptions and the like by implementing and registering i_objsession 
+     * callbacks. the presence of these function pointers in the implementation
+     * serves as the indicator of whether to forward the notification.
+     */
+    i_objsession* impl_callbacks = thisx->impl_callbacks; 
+ 
+    etch_session_control impl_session_control = impl_callbacks? 
+        impl_callbacks->_session_control: NULL;     
+
+    if (impl_session_control) /* if impl has requested this event, forward it */
+        if (0 == (result = impl_session_control (impl_callbacks, control, value)))
+            is_params_relinquished = TRUE;
+
+    if (0 != result)
+    {   char* logmask = impl_session_control? logmask1: logmask2;
+        etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, scstr);
+    }
+
+    if (!is_params_relinquished) 
+    {   ETCHOBJ_DESTROY(control);
+        ETCHOBJ_DESTROY(value);
+    }
+
+    return result;
+}
+
+
+/**
+ * etch_etchstub_session_notify()
+ * @param evt event, caller relinquishes.
+ */
+int etchstub_session_notify (etch_stub* thisx, etch_event* evt)
+{
+    const int result = etchstub_session_notify_ex (thisx, thisx->obj, evt);
+    return result;
+}
+
+
+/**
+ * etch_etchstub_session_query()
+ * @param query, caller relinquishes.
+ */
+objmask* etchstub_session_query (etch_stub* thisx, objmask* query) 
+{
+   objmask* resultobj = NULL;
+   int is_params_relinquished = FALSE;
+
+    /* a server implementation (xxxx_server_impl) can request notifications of 
+     * exceptions and the like by implementing and registering i_objsession 
+     * callbacks. the presence of these function pointers in the implementation
+     * serves as the indicator of whether to forward the notification.
+     */
+    i_objsession* impl_callbacks = thisx->impl_callbacks;
+ 
+    etch_session_query impl_session_query = impl_callbacks? 
+        impl_callbacks->_session_query: NULL;
+
+    if (impl_session_query) /* if impl has requested this event, forward it */
+        if (NULL != (resultobj = impl_session_query (impl_callbacks, query)))
+            is_params_relinquished = TRUE;
+
+    if (NULL == resultobj)
+    {   char* logmask = impl_session_query? logmask1: logmask2;
+        etchlog(ETCHSTUB, ETCHLOG_ERROR, logmask, sqstr);
+    }
+
+    if (!is_params_relinquished) 
+        ETCHOBJ_DESTROY(query);
+
+    return resultobj;
+}
+
+
+/* - - - - - - - - - - - - - - -  
+ * constructors, destructor 
+ * - - - - - - - - - - - - - - -  
+ */ 
+
+/**
+ * destroy_stub_object()
+ * stub implementation private destructor.
+ * calls back to hand-coded destructor for possible custom deallocation,
+ * then destroys the stub base, including the shared session interface.
+ */
+int destroy_stub_object (void* thisx)
+{
+    xxxx_either_stub* stubobj = NULL;
+    if (NULL == thisx) return -1;
+
+    stubobj = (xxxx_either_stub*) thisx;
+    if (!is_etch_stub(thisx)) return -1;
+
+    if (stubobj->refcount > 0 && --stubobj->refcount > 0) return -1;  
+
+    if (!is_etchobj_static_content(stubobj))
+    {    
+        if (stubobj->destroyex) /* call back to user dtor */
+            stubobj->destroyex(thisx);
+
+        if (stubobj->stub_base)
+            stubobj->stub_base->destroy(stubobj->stub_base);
+    }
+            
+    return destroy_objectex((objmask*)stubobj);
+} 
+
+
+/**
+ * is_etch_stub()
+ */
+int is_etch_stub(void* x)
+{
+    int result = FALSE, objtype = x? ((objmask*)x)->obj_type: 0;
+    switch(objtype)
+    { case ETCHTYPEB_STUB: case ETCHTYPEB_CLIENTSTUB: case ETCHTYPEB_SERVERSTUB:
+           result = TRUE;
+    }
+    return result;
+}            
+
+
+/**
+ * new_stubimpl_init()
+ * generic stub implementation constructor
+ * @param implobj i_xxxx_client* or i_xxxx_server*.
+ * @param objsize number of bytes in the stub object implementation. 
+ * all we know about here is the mask, or object header if you will.
+ * @param stubtype ETCH_STUBTYPE_CLIENT or ETCH_STUBTYPE_SERVER.
+ * @param userdtor a hand-coded callback in the implementation conforming 
+ * to signature etch_destructor, for the purpose of freeing any custom 
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains. 
+ * @param fp the free thread pool, optional, caller retains. 
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_stubimpl_init (void* implobj, const int objsize, 
+      const unsigned char stubtype, etch_destructor userdtor,
+      i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp, 
+      void* params)  
+{
+    unsigned short obj_type, class_id;
+    xxxx_either_stub* newstub = NULL;
+
+    switch(stubtype)
+    {  
+        case ETCH_STUBTYPE_CLIENT:
+             obj_type = ETCHTYPEB_CLIENTSTUB;
+             class_id = CLASSID_CLIENTSTUB;
+             break;
+        case ETCH_STUBTYPE_SERVER:
+             obj_type = ETCHTYPEB_SERVERSTUB;
+             class_id = CLASSID_SERVERSTUB;
+             break;
+        default: return NULL;
+    }
+
+    newstub = (xxxx_either_stub*) new_object (objsize, obj_type, class_id);  
+    newstub->destroy   = destroy_stub_object; /* private */
+    newstub->destroyex = userdtor;           /* public */
+    newstub->stub_base = new_stub (implobj, stubtype, ids, qp, fp);
+    newstub->stub_base->stubobj = (objmask*) newstub;
+    newstub->stub_base->params  = params;
+ 
+    return newstub;
+}
+
+
+/**
+ * new_clientstub_init()
+ * generic client stub implementation constructor.
+ * @param bytelen number of bytes in the client stub object implementation. 
+ * all we know about here is the mask, or object header if you will.
+ * @param dtor a hand-coded callback in the implementation conforming 
+ * to signature etch_destructor, for the purpose of freeing any custom 
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains. 
+ * @param fp the free thread pool, optional, caller retains. 
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_clientstub_init (void* implobj, const int bytelen, etch_destructor dtor, 
+    i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp, void* params)  
+{
+    return new_stubimpl_init (implobj, bytelen, ETCH_STUBTYPE_CLIENT, 
+        dtor, ids, qp, fp, params);  
+}
+
+
+/**
+ * new_serverstub_init()
+ * generic server stub implementation constructor.
+ * @param bytelen number of bytes in the server stub object implementation. 
+ * all we know about here is the mask, or object header if you will.
+ * @param dtor a hand-coded callback in the implementation conforming 
+ * to signature etch_destructor, for the purpose of freeing any custom 
+ * memory allocations.
+ * @param ids the delivery service, caller retains.
+ * @param qp the queued thread pool, optional, caller retains. 
+ * @param fp the free thread pool, optional, caller retains. 
+ * @param params a etch_server_factory* parameter bundle, caller retains.
+ * if it is always the case that this parameter is present and is a
+ * etch_server_factory*, we can lose the ids, qp, and fp ctor parameters.
+ */
+void* new_serverstub_init (void* implobj, const int bytelen, etch_destructor dtor, 
+    i_delivery_service* ids, etch_threadpool* qp, etch_threadpool* fp, void* params)  
+{
+    return new_stubimpl_init (implobj, bytelen, ETCH_STUBTYPE_SERVER, 
+        dtor, ids, qp, fp, params);  
+}
+
+
+/* - - - - - - - - - - - - - - - - - - - - -
+ * generic methods on a stub implementation 
+ * - - - - - - - - - - - - - - - - - - - - -
+ */ 
+
+

Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_threadpool_apr.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,1028 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership.  The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License.  You may obtain a copy of
+ * the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/*
+ * etch_threadpool_apr.c
+ * apache portable runtime threadpool code
+ */
+
+#include <assert.h>
+#include "etch_threadpool_apr.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_portable.h"
+
+#define TASK_PRIORITY_SEGS 4       
+#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
+
+
+typedef struct apr_thread_pool_task
+{
+    APR_RING_ENTRY(apr_thread_pool_task) link;
+    apr_thread_start_t func;
+    void *param;
+    void *owner;
+    union
+    {
+        apr_byte_t priority;
+        apr_time_t time;
+    } dispatch;
+} apr_thread_pool_task_t;
+
+
+APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
+
+
+struct apr_thread_list_elt
+{
+    APR_RING_ENTRY(apr_thread_list_elt) link;
+    apr_thread_t *thd;
+    volatile void *current_owner;
+    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
+};
+
+
+APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
+
+
+struct apr_thread_pool
+{
+    apr_pool_t *pool;
+    volatile apr_size_t thd_max;
+    volatile apr_size_t idle_max;
+    volatile apr_interval_time_t idle_wait;
+    volatile apr_size_t thd_cnt;
+    volatile apr_size_t idle_cnt;
+    volatile apr_size_t task_cnt;
+    volatile apr_size_t scheduled_task_cnt;
+    volatile apr_size_t threshold;
+    volatile apr_size_t tasks_run;
+    volatile apr_size_t tasks_high;
+    volatile apr_size_t thd_high;
+    volatile apr_size_t thd_timed_out;
+    struct apr_thread_pool_tasks *tasks;
+    struct apr_thread_pool_tasks *scheduled_tasks;
+    struct apr_thread_list *busy_thds;
+    struct apr_thread_list *idle_thds;
+    apr_thread_mutex_t *lock;
+    apr_thread_mutex_t *cond_lock;
+    apr_thread_cond_t *cond;
+    volatile int terminated;
+    struct apr_thread_pool_tasks *recycled_tasks;
+    struct apr_thread_list *recycled_thds;
+    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
+};
+
+
+
+static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
+                                          apr_size_t init_threads,
+                                          apr_size_t max_threads)
+{
+    apr_status_t rv;
+    int i;
+
+    me->thd_max = max_threads;
+    me->idle_max = init_threads;
+    me->threshold = init_threads / 2;
+    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        return rv;
+    }
+    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        return rv;
+    }
+    rv = apr_thread_cond_create(&me->cond, me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        apr_thread_mutex_destroy(me->cond_lock);
+        return rv;
+    }
+    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+    if (!me->tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
+    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
+    if (!me->scheduled_tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
+    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+    if (!me->recycled_tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
+    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+    if (!me->busy_thds) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
+    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
+    if (!me->idle_thds) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
+    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
+    if (!me->recycled_thds) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
+    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
+    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
+    me->idle_wait = 0;
+    me->terminated = 0;
+    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
+        me->task_idx[i] = NULL;
+    }
+    goto FINAL_EXIT;
+  CATCH_ENOMEM:
+    rv = APR_ENOMEM;
+    apr_thread_mutex_destroy(me->lock);
+    apr_thread_mutex_destroy(me->cond_lock);
+    apr_thread_cond_destroy(me->cond);
+  FINAL_EXIT:
+    return rv;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
+{
+    apr_thread_pool_task_t *task = NULL;
+    int seg;
+
+    /* check for scheduled tasks */
+    if (me->scheduled_task_cnt > 0) {
+        task = APR_RING_FIRST(me->scheduled_tasks);
+        assert(task != NULL);
+        assert(task !=
+               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                                 link));
+        /* if it's time */
+        if (task->dispatch.time <= apr_time_now()) {
+            --me->scheduled_task_cnt;
+            APR_RING_REMOVE(task, link);
+            return task;
+        }
+    }
+    /* check for normal tasks if we're not returning a scheduled task */
+    if (me->task_cnt == 0) {
+        return NULL;
+    }
+
+    task = APR_RING_FIRST(me->tasks);
+    assert(task != NULL);
+    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
+    --me->task_cnt;
+    seg = TASK_PRIORITY_SEG(task);
+    if (task == me->task_idx[seg]) {
+        me->task_idx[seg] = APR_RING_NEXT(task, link);
+        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+                                                   apr_thread_pool_task, link)
+            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+            me->task_idx[seg] = NULL;
+        }
+    }
+    APR_RING_REMOVE(task, link);
+    return task;
+}
+
+
+static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
+{
+    apr_thread_pool_task_t *task = NULL;
+
+    task = APR_RING_FIRST(me->scheduled_tasks);
+    assert(task != NULL);
+    assert(task !=
+           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                             link));
+    return task->dispatch.time - apr_time_now();
+}
+
+
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
+                                           apr_thread_t * t)
+{
+    struct apr_thread_list_elt *elt;
+
+    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
+        elt = apr_pcalloc(me->pool, sizeof(*elt));
+        if (NULL == elt) {
+            return NULL;
+        }
+    }
+    else {
+        elt = APR_RING_FIRST(me->recycled_thds);
+        APR_RING_REMOVE(elt, link);
+    }
+
+    APR_RING_ELEM_INIT(elt, link);
+    elt->thd = t;
+    elt->current_owner = NULL;
+    elt->state = TH_RUN;
+    return elt;
+}
+
+
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the idle thread list and waiting
+ * for signal to wake up.
+ * The thread terminate directly by detach and exit when it is asked to stop
+ * after finishing a task. Otherwise, the thread should be in idle thread list
+ * and should be joined.
+ */
+static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_thread_pool_t *me = param;
+    apr_thread_pool_task_t *task = NULL;
+    apr_interval_time_t wait;
+    struct apr_thread_list_elt *elt;
+
+    apr_thread_mutex_lock(me->lock);
+    elt = elt_new(me, t);
+    if (!elt) {
+        apr_thread_mutex_unlock(me->lock);
+        apr_thread_exit(t, APR_ENOMEM);
+    }
+
+    while (!me->terminated && elt->state != TH_STOP) {
+        /* Test if not new element, it is awakened from idle */
+        if (APR_RING_NEXT(elt, link) != elt) {
+            --me->idle_cnt;
+            APR_RING_REMOVE(elt, link);
+        }
+
+        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
+        task = pop_task(me);
+        while (NULL != task && !me->terminated) {
+            ++me->tasks_run;
+            elt->current_owner = task->owner;
+            apr_thread_mutex_unlock(me->lock);
+            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
+            task->func(t, task->param);
+            apr_thread_mutex_lock(me->lock);
+            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+                                 apr_thread_pool_task, link);
+            elt->current_owner = NULL;
+            if (TH_STOP == elt->state) {
+                break;
+            }
+            task = pop_task(me);
+        }
+        assert(NULL == elt->current_owner);
+        if (TH_STOP != elt->state)
+            APR_RING_REMOVE(elt, link);
+
+        /* Test if a busy thread been asked to stop, which is not joinable */
+        if ((me->idle_cnt >= me->idle_max
+             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
+             && !me->idle_wait)
+            || me->terminated || elt->state != TH_RUN) {
+            --me->thd_cnt;
+            if ((TH_PROBATION == elt->state) && me->idle_wait)
+                ++me->thd_timed_out;
+            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
+                                 apr_thread_list_elt, link);
+            apr_thread_mutex_unlock(me->lock);
+            apr_thread_detach(t);
+            apr_thread_exit(t, APR_SUCCESS);
+            return NULL;        /* should not be here, safe net */
+        }
+
+        /* busy thread become idle */
+        ++me->idle_cnt;
+        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
+
+        /* 
+         * If there is a scheduled task, always scheduled to perform that task.
+         * Since there is no guarantee that current idle threads are scheduled
+         * for next scheduled task.
+         */
+        if (me->scheduled_task_cnt)
+            wait = waiting_time(me);
+        else if (me->idle_cnt > me->idle_max) {
+            wait = me->idle_wait;
+            elt->state = TH_PROBATION;
+        }
+        else
+            wait = -1;
+
+        apr_thread_mutex_unlock(me->lock);
+        apr_thread_mutex_lock(me->cond_lock);
+        if (wait >= 0) {
+            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
+        }
+        else {
+            rv = apr_thread_cond_wait(me->cond, me->cond_lock);
+        }
+        apr_thread_mutex_unlock(me->cond_lock);
+        apr_thread_mutex_lock(me->lock);
+    }
+
+    /* idle thread been asked to stop, will be joined */
+    --me->thd_cnt;
+    apr_thread_mutex_unlock(me->lock);
+    apr_thread_exit(t, APR_SUCCESS);
+    return NULL;                /* should not be here, safe net */
+}
+
+static apr_status_t thread_pool_cleanup(void *me)
+{
+    apr_thread_pool_t *_self = me;
+
+    _self->terminated = 1;
+    etch_apr_thread_pool_idle_max_set(_self, 0);
+    while (_self->thd_cnt) {
+        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
+    }
+    apr_thread_mutex_destroy(_self->lock);
+    apr_thread_mutex_destroy(_self->cond_lock);
+    apr_thread_cond_destroy(_self->cond);
+    return APR_SUCCESS;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_create(apr_thread_pool_t ** me,
+                                                 apr_size_t init_threads,
+                                                 apr_size_t max_threads,
+                                                 apr_pool_t * pool)
+{
+    apr_thread_t *t;
+    apr_status_t rv = APR_SUCCESS;
+
+    *me = apr_pcalloc(pool, sizeof(**me));
+    if (!*me) {
+        return APR_ENOMEM;
+    }
+
+    (*me)->pool = pool;
+
+    rv = thread_pool_construct(*me, init_threads, max_threads);
+    if (APR_SUCCESS != rv) {
+        *me = NULL;
+        return rv;
+    }
+    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
+                              apr_pool_cleanup_null);
+
+    while (init_threads) {
+        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
+        if (APR_SUCCESS != rv) {
+            break;
+        }
+        ++(*me)->thd_cnt;
+        if ((*me)->thd_cnt > (*me)->thd_high)
+            (*me)->thd_high = (*me)->thd_cnt;
+        --init_threads;
+    }
+
+    return rv;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_destroy(apr_thread_pool_t * me)
+{
+    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
+}
+
+
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
+                                        apr_thread_start_t func,
+                                        void *param, apr_byte_t priority,
+                                        void *owner, apr_time_t time)
+{
+    apr_thread_pool_task_t *t;
+
+    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
+        t = apr_pcalloc(me->pool, sizeof(*t));
+        if (NULL == t) {
+            return NULL;
+        }
+    }
+    else {
+        t = APR_RING_FIRST(me->recycled_tasks);
+        APR_RING_REMOVE(t, link);
+    }
+
+    APR_RING_ELEM_INIT(t, link);
+    t->func = func;
+    t->param = param;
+    t->owner = owner;
+    if (time > 0) {
+        t->dispatch.time = apr_time_now() + time;
+    }
+    else {
+        t->dispatch.priority = priority;
+    }
+    return t;
+}
+
+
+
+/*
+ * Test it the task is the only one within the priority segment. 
+ * If it is not, return the first element with same or lower priority. 
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
+                                            apr_thread_pool_task_t * const t)
+{
+    int seg;
+    int next;
+    apr_thread_pool_task_t *t_next;
+
+    seg = TASK_PRIORITY_SEG(t);
+    if (me->task_idx[seg]) {
+        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+               me->task_idx[seg]);
+        t_next = me->task_idx[seg];
+        while (t_next->dispatch.priority > t->dispatch.priority) {
+            t_next = APR_RING_NEXT(t_next, link);
+            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
+                t_next) {
+                return t_next;
+            }
+        }
+        return t_next;
+    }
+
+    for (next = seg - 1; next >= 0; next--) {
+        if (me->task_idx[next]) {
+            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+            break;
+        }
+    }
+    if (0 > next) {
+        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
+    }
+    me->task_idx[seg] = t;
+    return NULL;
+}
+
+
+/*
+ *   schedule a task to run in "time" microseconds. Find the spot in the ring where
+ *   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
+ */
+static apr_status_t schedule_task(apr_thread_pool_t *me,
+                                  apr_thread_start_t func, void *param,
+                                  void *owner, apr_interval_time_t time)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_t *thd;
+    apr_status_t rv = APR_SUCCESS;
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, 0, owner, time);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+    t_loc = APR_RING_FIRST(me->scheduled_tasks);
+    while (NULL != t_loc) {
+        /* if the time is less than the entry insert ahead of it */
+        if (t->dispatch.time < t_loc->dispatch.time) {
+            ++me->scheduled_task_cnt;
+            APR_RING_INSERT_BEFORE(t_loc, t, link);
+            break;
+        }
+        else {
+            t_loc = APR_RING_NEXT(t_loc, link);
+            if (t_loc ==
+                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                                  link)) {
+                ++me->scheduled_task_cnt;
+                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
+                                     apr_thread_pool_task, link);
+                break;
+            }
+        }
+    }
+    /* there should be at least one thread for scheduled tasks */
+    if (0 == me->thd_cnt) {
+        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+        if (APR_SUCCESS == rv) {
+            ++me->thd_cnt;
+            if (me->thd_cnt > me->thd_high)
+                me->thd_high = me->thd_cnt;
+        }
+    }
+    apr_thread_mutex_unlock(me->lock);
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+    return rv;
+}
+
+
+
+static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
+                             void *param, apr_byte_t priority, int push,
+                             void *owner)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_t *thd;
+    apr_status_t rv = APR_SUCCESS;
+
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, priority, owner, 0);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+
+    t_loc = add_if_empty(me, t);
+    if (NULL == t_loc) {
+        goto FINAL_EXIT;
+    }
+
+    if (push) {
+        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
+            t_loc = APR_RING_NEXT(t_loc, link);
+        }
+    }
+    APR_RING_INSERT_BEFORE(t_loc, t, link);
+    if (!push) {
+        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+        }
+    }
+
+  FINAL_EXIT:
+    me->task_cnt++;
+    if (me->task_cnt > me->tasks_high)
+        me->tasks_high = me->task_cnt;
+    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
+                             me->task_cnt > me->threshold)) {
+        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+        if (APR_SUCCESS == rv) {
+            ++me->thd_cnt;
+            if (me->thd_cnt > me->thd_high)
+                me->thd_high = me->thd_cnt;
+        }
+    }
+    apr_thread_mutex_unlock(me->lock);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    return rv;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_push(apr_thread_pool_t *me,
+                                               apr_thread_start_t func,
+                                               void *param,
+                                               apr_byte_t priority,
+                                               void *owner)
+{
+    return add_task(me, func, param, priority, 1, owner);
+}
+
+
+
+apr_status_t etch_apr_thread_pool_schedule(apr_thread_pool_t *me,
+                                                   apr_thread_start_t func,
+                                                   void *param,
+                                                   apr_interval_time_t time,
+                                                   void *owner)
+{
+    return schedule_task(me, func, param, owner, time);
+}
+
+
+
+apr_status_t etch_apr_thread_pool_top(apr_thread_pool_t *me,
+                                              apr_thread_start_t func,
+                                              void *param,
+                                              apr_byte_t priority,
+                                              void *owner)
+{
+    return add_task(me, func, param, priority, 0, owner);
+}
+
+
+
+static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
+                                           void *owner)
+{
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_pool_task_t *next;
+
+    t_loc = APR_RING_FIRST(me->scheduled_tasks);
+    while (t_loc !=
+           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                             link)) {
+        next = APR_RING_NEXT(t_loc, link);
+        /* if this is the owner remove it */
+        if (t_loc->owner == owner) {
+            --me->scheduled_task_cnt;
+            APR_RING_REMOVE(t_loc, link);
+        }
+        t_loc = next;
+    }
+    return APR_SUCCESS;
+}
+
+
+
+static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
+{
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_pool_task_t *next;
+    int seg;
+
+    t_loc = APR_RING_FIRST(me->tasks);
+    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
+        next = APR_RING_NEXT(t_loc, link);
+        if (t_loc->owner == owner) {
+            --me->task_cnt;
+            seg = TASK_PRIORITY_SEG(t_loc);
+            if (t_loc == me->task_idx[seg]) {
+                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
+                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+                                                           apr_thread_pool_task,
+                                                           link)
+                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+                    me->task_idx[seg] = NULL;
+                }
+            }
+            APR_RING_REMOVE(t_loc, link);
+        }
+        t_loc = next;
+    }
+    return APR_SUCCESS;
+}
+
+
+
+static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
+{
+#ifndef NDEBUG
+    apr_os_thread_t *os_thread;
+#endif
+    struct apr_thread_list_elt *elt;
+    apr_thread_mutex_lock(me->lock);
+    elt = APR_RING_FIRST(me->busy_thds);
+    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
+        if (elt->current_owner != owner) {
+            elt = APR_RING_NEXT(elt, link);
+            continue;
+        }
+#ifndef NDEBUG
+        /* make sure the thread is not the one calling tasks_cancel */
+        apr_os_thread_get(&os_thread, elt->thd);
+#ifdef WIN32
+        /* hack for apr win32 bug */
+        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
+#else
+        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
+#endif
+#endif
+        while (elt->current_owner == owner) {
+            apr_thread_mutex_unlock(me->lock);
+            apr_sleep(200 * 1000);
+            apr_thread_mutex_lock(me->lock);
+        }
+        elt = APR_RING_FIRST(me->busy_thds);
+    }
+    apr_thread_mutex_unlock(me->lock);
+    return;
+}
+
+
+
+apr_status_t etch_apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
+                                                       void *owner)
+{
+    apr_status_t rv = APR_SUCCESS;
+
+    apr_thread_mutex_lock(me->lock);
+    if (me->task_cnt > 0) {
+        rv = remove_tasks(me, owner);
+    }
+    if (me->scheduled_task_cnt > 0) {
+        rv = remove_scheduled_tasks(me, owner);
+    }
+    apr_thread_mutex_unlock(me->lock);
+    wait_on_busy_threads(me, owner);
+
+    return rv;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_count(apr_thread_pool_t *me)
+{
+    return me->task_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
+{
+    return me->scheduled_task_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_count(apr_thread_pool_t *me)
+{
+    return me->thd_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_busy_count(apr_thread_pool_t *me)
+{
+    return me->thd_cnt - me->idle_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_count(apr_thread_pool_t *me)
+{
+    return me->idle_cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
+{
+    return me->tasks_run;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
+{
+    return me->tasks_high;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
+{
+    return me->thd_high;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
+{
+    return me->thd_timed_out;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
+{
+    return me->idle_max;
+}
+
+
+apr_interval_time_t etch_apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
+{
+    return me->idle_wait;
+}
+
+
+
+/*
+ * This function stop extra idle threads to the cnt.
+ * @return the number of threads stopped
+ * NOTE: There could be busy threads become idle during this function
+ */
+static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
+                                                apr_size_t *cnt, int idle)
+{
+    struct apr_thread_list *thds;
+    apr_size_t n, n_dbg, i;
+    struct apr_thread_list_elt *head, *tail, *elt;
+
+    apr_thread_mutex_lock(me->lock);
+    if (idle) {
+        thds = me->idle_thds;
+        n = me->idle_cnt;
+    }
+    else {
+        thds = me->busy_thds;
+        n = me->thd_cnt - me->idle_cnt;
+    }
+    if (n <= *cnt) {
+        apr_thread_mutex_unlock(me->lock);
+        *cnt = 0;
+        return NULL;
+    }
+    n -= *cnt;
+
+    head = APR_RING_FIRST(thds);
+    for (i = 0; i < *cnt; i++) {
+        head = APR_RING_NEXT(head, link);
+    }
+    tail = APR_RING_LAST(thds);
+    if (idle) {
+        APR_RING_UNSPLICE(head, tail, link);
+        me->idle_cnt = *cnt;
+    }
+
+    n_dbg = 0;
+    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+        elt->state = TH_STOP;
+        n_dbg++;
+    }
+    elt->state = TH_STOP;
+    n_dbg++;
+    assert(n == n_dbg);
+    *cnt = n;
+
+    apr_thread_mutex_unlock(me->lock);
+
+    APR_RING_PREV(head, link) = NULL;
+    APR_RING_NEXT(tail, link) = NULL;
+    return head;
+}
+
+
+
+static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+    apr_size_t n_dbg;
+    struct apr_thread_list_elt *elt, *head, *tail;
+    apr_status_t rv;
+
+    elt = trim_threads(me, &cnt, 1);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_broadcast(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    n_dbg = 0;
+    if (NULL != (head = elt)) {
+        while (elt) {
+            tail = elt;
+            apr_thread_join(&rv, elt->thd);
+            elt = APR_RING_NEXT(elt, link);
+            ++n_dbg;
+        }
+        apr_thread_mutex_lock(me->lock);
+        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
+                             apr_thread_list_elt, link);
+        apr_thread_mutex_unlock(me->lock);
+    }
+    assert(cnt == n_dbg);
+
+    return cnt;
+}
+
+
+/* don't join on busy threads for performance reasons, who knows how long will
+ * the task takes to perform
+ */
+static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+    trim_threads(me, &cnt, 0);
+    return cnt;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
+                                                     apr_size_t cnt)
+{
+    me->idle_max = cnt;
+    cnt = trim_idle_threads(me, cnt);
+    return cnt;
+}
+
+
+
+apr_interval_time_t etch_apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
+                                  apr_interval_time_t timeout)
+{
+    apr_interval_time_t oldtime;
+
+    oldtime = me->idle_wait;
+    me->idle_wait = timeout;
+
+    return oldtime;
+}
+
+
+
+apr_size_t etch_apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
+{
+    return me->thd_max;
+}
+
+
+/*
+ * This function stop extra working threads to the new limit.
+ * NOTE: There could be busy threads become idle during this function
+ */
+apr_size_t etch_apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
+                                                       apr_size_t cnt)
+{
+    unsigned int n;
+
+    me->thd_max = cnt;
+    if (0 == cnt || me->thd_cnt <= cnt) {
+        return 0;
+    }
+
+    n = (unsigned) me->thd_cnt - cnt;
+    if (n >= me->idle_cnt) {
+        trim_busy_threads(me, n - me->idle_cnt);
+        trim_idle_threads(me, 0);
+    }
+    else {
+        trim_idle_threads(me, me->idle_cnt - n);
+    }
+    return n;
+}
+
+
+apr_size_t etch_apr_thread_pool_threshold_get(apr_thread_pool_t *me)
+{
+    return me->threshold;
+}
+
+
+apr_size_t etch_apr_thread_pool_threshold_set(apr_thread_pool_t *me,
+                                                      apr_size_t val)
+{
+    apr_size_t ov;
+
+    ov = me->threshold;
+    me->threshold = val;
+    return ov;
+}
+
+
+apr_status_t etch_apr_thread_pool_task_owner_get(apr_thread_t *thd,
+                                                         void **owner)
+{
+    apr_status_t rv;
+    apr_thread_pool_task_t *task;
+    void *data;
+
+    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    task = data;
+    if (!task) {
+        *owner = NULL;
+        return APR_BADARG;
+    }
+
+    *owner = task->owner;
+    return APR_SUCCESS;
+}
+
+
+/* vim: set ts=4 sw=4 et cin tw=80: */

Added: incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/support/etch_transportint.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,130 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_transportint.c
+ * transport interface
+ */
+
+#include "etch_transportint.h"
+#include "etch_sessionint.h"
+#include "etch_global.h"
+
+int etchtransport_def_transport_control    (void*, void*, void*);
+int etchtransport_def_transport_notify     (void*, void*);
+objmask* etchtransport_def_transport_query (void*, void*);
+
+void* etchtransport_def_get_session(void* thisx);
+void  etchtransport_def_set_session(void* thisx, void* session);
+
+
+/**
+ * new_default_transport_interface
+ * return a transport interface populated with defaults for virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_default_transport_interface(void* thisx)
+{
+    i_transport* newi = etch_malloc(sizeof(i_transport), ETCHTYPEB_RAWOBJECT);
+
+    newi->transport_control = etchtransport_def_transport_control;
+    newi->transport_notify  = etchtransport_def_transport_notify;
+    newi->transport_query   = etchtransport_def_transport_query;
+    newi->get_session = etchtransport_def_get_session;
+    newi->set_session = etchtransport_def_set_session;
+    newi->thisx = thisx;
+    return newi;    
+}
+
+
+/**
+ * new_transport_interface
+ * return a transport interface populated with specified virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_transport_interface(void* thisx,
+  etch_transport_control sc, etch_transport_notify sn, etch_transport_query sq)
+{
+    i_transport* newi = new_default_transport_interface(thisx);
+
+    if (sc) newi->transport_control = sc;
+    if (sn) newi->transport_notify  = sn;
+    if (sq) newi->transport_query   = sq;
+    return newi;    
+}
+
+
+/**
+ * new_transport_interface_ex
+ * return a transport interface populated with specified virtuals.
+ * caller owns returned object, not an etch object, use etch_free() to destroy.
+ */
+i_transport* new_transport_interface_ex(void* thisx, 
+  etch_transport_control sc, etch_transport_notify sn, etch_transport_query sq,
+  etch_transport_get_session gs, etch_transport_set_session ss)
+{   
+    i_transport* newi = new_transport_interface(thisx, sc, sn, sq);
+
+    if (gs) newi->get_session = gs;
+    if (ss) newi->set_session = ss;
+    return newi;    
+}
+
+
+/**
+ * clone_transport() 
+ */
+i_transport* clone_transport(void* thisx, const i_transport* thattransport) 
+{
+    i_transport* newtransport 
+        = thattransport? new_default_transport_interface(thisx): NULL;
+
+    if (newtransport)
+        memcpy(newtransport, thattransport, sizeof(i_transport));
+     
+    return newtransport;
+}
+
+
+int etchtransport_def_transport_control (void* obj, void* evt, void* v)
+{
+    return -1;
+}
+
+
+int etchtransport_def_transport_notify  (void* obj, void* evt)
+{
+    return -1;
+}
+
+
+objmask* etchtransport_def_transport_query (void* obj, void* query) 
+{
+    return NULL;
+}
+
+
+void* etchtransport_def_get_session(void* thisx) 
+{
+    return NULL;
+}
+
+
+void etchtransport_def_set_session(void* thisx, void* session)
+{
+}
\ No newline at end of file

Added: incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/test/common/test_allocator.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,381 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * test_allocator.c -- test the debug allocator 
+ */
+#include "apr_time.h" /* some apr must be included first */
+#include "etchthread.h"
+#include <tchar.h>
+#include <stdio.h>
+#include <conio.h>
+#include "etchobj.h"
+#include "etch_global.h"
+
+#include "cunit.h"
+#include "basic.h"
+#include "automated.h"
+
+int apr_setup(void);
+int apr_teardown(void);
+int this_setup();
+int this_teardown();
+apr_pool_t* g_apr_mempool;
+const char* pooltag = "etchpool";
+
+
+/* - - - - - - - - - - - - - - 
+ * unit test infrastructure
+ * - - - - - - - - - - - - - -
+ */
+
+int init_suite(void)
+{
+    apr_setup();
+    etch_runtime_init(TRUE);
+    return this_setup();
+}
+
+int clean_suite(void)
+{
+    this_teardown();
+    etch_runtime_cleanup(0,0); /* free memtable and cache etc */
+    apr_teardown();
+    return 0;
+}
+
+int g_is_automated_test, g_bytes_allocated;
+
+#define IS_DEBUG_CONSOLE FALSE
+
+/*
+ * apr_setup()
+ * establish apache portable runtime environment
+ */
+int apr_setup(void)
+{
+    int result = apr_initialize();
+    if (result == 0)
+    {   result = etch_apr_init();
+        g_apr_mempool = etch_apr_mempool;
+    }
+    if (g_apr_mempool)
+        apr_pool_tag(g_apr_mempool, pooltag);
+    else result = -1;
+    return result;
+}
+
+/*
+ * apr_teardown()
+ * free apache portable runtime environment
+ */
+int apr_teardown(void)
+{
+    if (g_apr_mempool)
+        apr_pool_destroy(g_apr_mempool);
+    g_apr_mempool = NULL;
+    apr_terminate();
+    return 0;
+}
+
+int this_setup()
+{
+    etch_apr_mempool = g_apr_mempool;
+    return 0;
+}
+
+int this_teardown()
+{    
+    return 0;
+}
+
+
+#define PAYLOADSIG 0xf00f00
+#define NUMITEMS 3
+
+typedef struct payload
+{
+    int count;
+    int signature;
+} payload;
+
+
+/* 
+ * This subtest test our ability to build a hashtable each key of which is a
+ * pointer to that key's value, in other words, the hashtable stores a void**
+ * as they key. This essentially models a debug allocator. 
+ */
+void malloc_test(void)
+{
+    /* These are what is stored in the hash table for each hash, two pointers.
+       To clarify, *pointers* are what is copied into the hashtable, not values.
+       So it follows that if I want to use a pointer as a hash key, what I must
+       store in the hashtable as the key is a pointer to that pointer.
+    */
+    char **pkey = 0; void *pval = 0; 
+     
+    payload** allocations[NUMITEMS];
+    int i, testresult = 0, count = 0;
+    etch_hashtable* myhash;
+    etch_hashitem   hashbucket;
+    etch_hashitem*  myentry = &hashbucket;
+
+    is_memtable_instance = TRUE;
+    myhash = new_hashtable(16);  /* create hash table */
+    is_memtable_instance = FALSE;
+    CU_ASSERT_PTR_NOT_NULL_FATAL(myhash); 
+
+    /* insert NUMITEMS allocations into the tracking table
+     */
+    for(i = 0; i < NUMITEMS; i++)
+    {
+        int      result = 0;
+        void*    pkey = 0;
+        payload* pval = 0;
+       
+        pkey = malloc(sizeof(void*));        /* allocate the key*    */        
+        pval = malloc(sizeof(payload));      /* allocate the value   */
+        pval->count = i; pval->signature = PAYLOADSIG; /* debug info */
+        memcpy(pkey, &pval, sizeof(void*));  /* copy value* into key */
+        allocations[i] = pkey  ;             /* save for iterating   */
+
+        result = myhash->vtab->insert(myhash->realtable, pkey, sizeof(void*), pval, sizeof(payload), 0, 0);
+        CU_ASSERT_EQUAL(result,0);
+    } 
+
+    /* ensure there are NUMITEMS entries in the tracking table*/
+    count = myhash->vtab->count(myhash->realtable, 0, 0);
+    CU_ASSERT_EQUAL_FATAL(count,NUMITEMS);
+
+    for(i = 0; i < NUMITEMS; i++)
+    {
+        int       result = 0;
+        payload*  pval = 0;
+        payload** pkey = (payload**) &allocations[i]; 
+
+        result = myhash->vtab->find(myhash->realtable, *pkey, sizeof(void*), NULL, &myentry); 
+        CU_ASSERT_EQUAL(result,0);
+
+        pval = (payload*)myentry->value;
+        CU_ASSERT_EQUAL(pval->signature, PAYLOADSIG);
+        CU_ASSERT_EQUAL(pval->count, i);
+    } 
+
+    /* 
+     * Clear the hashtable, while doing so ask hashtable to free memory at the 
+     * and value pointers. Since we have individually allocated both keys (4 bytes
+     * each, each a void**), and values (sizeof(payload) bytes each), we can do so. 
+     * If we do not crash, this mini-test succeeds.
+     * CUnit note: there is no easy way to test if this succeeds, since the hashtable
+     * no longer exists if successful, and an attempt to reference it would elicit 
+     * an exception.
+     */
+    myhash->vtab->clear(myhash->realtable, TRUE, TRUE, 0, 0);
+
+    g_bytes_allocated = etch_showmem(0,0);  /* verify all memory freed */
+    CU_ASSERT_EQUAL(g_bytes_allocated, 0);  
+    memtable_clear();  /* start fresh for next test */ 
+}
+
+
+/* 
+ * This subtest tests our implemented debug allocator. 
+ */
+void etch_malloc_test(void)
+{
+    /* These are what is stored in the hash table for each hash, two pointers.
+       To clarify, *pointers* are what is copied into the hashtable, not values.
+       So it follows that if I want to use a pointer as a hash key, what I must
+       store in the hashtable as the key is a pointer to that pointer.
+    */
+     
+    payload* allocations[NUMITEMS];
+    int i = 0, testresult = 0, count = 0;
+    etch_hashitem   hashbucket;
+    etch_hashitem*  myentry = &hashbucket;
+
+    /* insert NUMITEMS allocations into the tracking table
+     */
+    for(i = 0; i < NUMITEMS; i++)
+    {
+        void*    pmem = 0;
+        payload* pval = 0;
+
+        pval = etch_malloc(sizeof(payload), 0); /* allocate value, get key */ 
+        CU_ASSERT_PTR_NOT_NULL_FATAL(pval); 
+
+        pval->count = i; pval->signature = PAYLOADSIG; /* debug info */
+        allocations[i] = pval  ;   /* save for iterating */
+    } 
+
+    /* ensure there are NUMITEMS entries in the tracking table */
+    CU_ASSERT_PTR_NOT_NULL_FATAL(memtable); 
+    count = memtable->vtab->count(memtable->realtable, 0, 0);  
+    CU_ASSERT_EQUAL_FATAL(count, NUMITEMS);   
+ 
+    for(i = 0; i < NUMITEMS; i++)
+    {
+        int n, result;
+        payload* pval = (payload*) allocations[i]; 
+        n = pval->count;
+
+        result = etch_free(pval);        
+        CU_ASSERT_EQUAL(result, 0);
+    } 
+
+    /* ensure there are zero entries in the tracking table */
+    CU_ASSERT_PTR_NOT_NULL_FATAL(memtable);
+    CU_ASSERT_EQUAL_FATAL(memtable->vtab->count(memtable->realtable, 0, 0), 0);
+
+    g_bytes_allocated = etch_showmem(0,0);  /* verify all memory freed */
+    CU_ASSERT_EQUAL(g_bytes_allocated, 0);  
+    memtable_clear();  /* start fresh for next test */  
+}
+
+
+/* 
+ * This subtest tests that the debug allocator reports un-freed memory as expected.
+ */
+void negative_etch_malloc_test(void)
+{
+    struct 
+    {   double d;
+    } EIGHTBYTES;
+    struct
+    {   char x[496];
+        int  n;
+    } FIVEHUNDREDBYTES;
+
+    void *p1=0, *p2=0;
+    int   bytes_allocated = 0;
+    memtable = NULL; /* ensure not dangling from a prior test */
+
+    p1 = etch_malloc(sizeof(EIGHTBYTES), 0); 
+    p2 = etch_malloc(sizeof(FIVEHUNDREDBYTES), 0);  
+
+    bytes_allocated = etch_showmem(0,0); /* get leaks - don't free them */
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES) + sizeof(FIVEHUNDREDBYTES));
+     
+    etch_free(p2);
+    bytes_allocated = etch_showmem(0,0); 
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+
+    etch_free(p1);
+
+    g_bytes_allocated = etch_showmem(0,0);  /* verify all memory freed */
+    CU_ASSERT_EQUAL(g_bytes_allocated, 0);  
+    memtable_clear();  /* start fresh for next test */ 
+}
+
+
+/* 
+ * This subtest tests that the debug allocator reports un-freed memory as expected.
+ */
+void etch_realloc_test(void)
+{
+    struct 
+    {   double d;
+    } EIGHTBYTES;
+    struct
+    {   char x[496];
+        int  n;
+    } FIVEHUNDREDBYTES;
+
+    void *p1=0, *p2=0;
+    int   bytes_allocated = 0;    
+
+    /* same as malloc */
+    p1 = etch_realloc(NULL, sizeof(EIGHTBYTES), 0);
+    CU_ASSERT_PTR_NOT_NULL(p1);
+    bytes_allocated = etch_showmem(0,0); /* get leaks - don't free them */
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+
+    /* same as free */
+    p2 = etch_realloc(p1, 0, 0);
+    bytes_allocated = etch_showmem(0,0);  
+    CU_ASSERT_EQUAL(bytes_allocated, 0);
+
+    /* now expand the memory */
+    p1 = etch_realloc(NULL, sizeof(EIGHTBYTES), 0);
+    CU_ASSERT_PTR_NOT_NULL(p1);
+    bytes_allocated = etch_showmem(0,0);  
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+    *((long *) p1) = 1234;
+
+    p2 = etch_realloc(p1, sizeof(FIVEHUNDREDBYTES), 0);
+
+    CU_ASSERT_PTR_NOT_NULL(p2);
+    bytes_allocated = etch_showmem(0,0);  
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(FIVEHUNDREDBYTES));
+    CU_ASSERT( (*( (long *) p2) ) == 1234 );
+
+    etch_free(p2);
+
+
+    /* now shrink memory */
+    p1 = etch_realloc(NULL, sizeof(FIVEHUNDREDBYTES), 0);
+    CU_ASSERT_PTR_NOT_NULL(p1);
+    bytes_allocated = etch_showmem(0,0);  
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(FIVEHUNDREDBYTES));
+    *((long *) p1) = 1234;
+
+    p2 = etch_realloc(p1, sizeof(EIGHTBYTES), 0);
+
+    CU_ASSERT_PTR_NOT_NULL(p2);
+    bytes_allocated = etch_showmem(0,0);  
+    CU_ASSERT_EQUAL(bytes_allocated, sizeof(EIGHTBYTES));
+    CU_ASSERT( (*( (long *) p2) ) == 1234 );
+
+    etch_free(p2);
+
+    g_bytes_allocated = etch_showmem(0,0);  /* verify all memory freed */
+    CU_ASSERT_EQUAL(g_bytes_allocated, 0);  
+    memtable_clear();  /* start fresh for next test */  
+}
+
+
+/**
+ * main   
+ */
+int _tmain(int argc, _TCHAR* argv[])
+{
+    char c=0;
+    CU_pSuite pSuite = NULL;
+    g_is_automated_test = argc > 1 && 0 != wcscmp(argv[1], L"-a");
+    if (CUE_SUCCESS != CU_initialize_registry()) return CU_get_error();
+
+    CU_set_output_filename("../test_allocator");
+    pSuite = CU_add_suite("suite_allocator", init_suite, clean_suite);
+
+    CU_add_test(pSuite, "hashtable pointer key test", malloc_test);
+    CU_add_test(pSuite, "debug realloc test", etch_realloc_test);  
+    CU_add_test(pSuite, "debug allocator test", etch_malloc_test);
+    CU_add_test(pSuite, "negative debug allocator test", negative_etch_malloc_test); 
+
+    if (g_is_automated_test)    
+        CU_automated_run_tests();    
+    else
+    {   CU_basic_set_mode(CU_BRM_VERBOSE);
+        CU_basic_run_tests();
+    }
+     
+    if (!g_is_automated_test) { printf("any key ..."); while(!c) c = _getch();  wprintf(L"\n"); }     
+    CU_cleanup_registry();
+    return CU_get_error(); 
+}
+