You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by jd...@apache.org on 2009/04/22 19:25:51 UTC

svn commit: r767594 [36/43] - in /incubator/etch/trunk/binding-c/runtime/c: ./ ext/ ext/hashtab/ ext/lib/ inc/ lib/ project/ project/$etchstop/ project/bin/ project/etch/ project/logcli/ project/logsrv/ project/notes/ project/test/ project/test/logcli/...

Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,259 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_connection.c
+ * connection client and server classes - tcp, udp
+ */
+
+#include "apr_time.h"
+#include "apr_network_io.h"
+#include "etchthread.h"
+#include "etch_connection.h"
+#include "etch_encoding.h"
+#include "etchflexbuf.h"
+#include "etchlog.h"
+#include "etch_global.h"
+
+const wchar_t* ETCH_CONNECTION_RECONDELAY   = L"TcpConnection.reconnect_delay";
+const wchar_t* ETCH_CONNECTION_AUTOFLUSH    = L"TcpConnection.autoFlush";
+const wchar_t* ETCH_CONNECTION_KEEPALIVE    = L"TcpConnection.keepAlive";
+const wchar_t* ETCH_CONNECTION_LINGERTIME   = L"TcpConnection.lingerTime";
+const wchar_t* ETCH_CONNECTION_NODELAY      = L"TcpConnection.noDelay";
+const wchar_t* ETCH_CONNECTION_TRAFCLASS    = L"TcpConnection.trafficClass";
+const wchar_t* ETCH_CONNECTION_BUFSIZE      = L"TcpConnection.bufferSize";
+const wchar_t* ETCH_TCPLISTENER_BACKLOG     = L"TcpListener.backlog";
+char* ETCHCONX = "CONX";
+
+unsigned next_etch_connection_id();
+
+
+/*
+ * is_good_tcp_params()
+ */
+int is_good_tcp_params(etch_url* url, void* resources, etch_rawsocket* socket)
+{
+    int whicherr = 0;
+
+    if (NULL == socket)
+    {
+        if (NULL == url->host) whicherr  = 0x1; 
+        if (url->port <= 0 || url->port > 0xffff) whicherr |= 0x2; 
+    }
+
+    return whicherr == 0;
+}
+
+
+/*
+ * etch_socket_set_timeout
+ * set timeout for specified socket
+ */
+int etch_socket_set_timeout(etch_rawsocket* socket, const unsigned ms)
+{
+    const int result = apr_socket_timeout_set(socket, ms * 1000);
+    return 0 == result? 0: -1;
+}
+
+
+/*
+ * etch_socket_get_timeout
+ * get timeout set on specified socket
+ */
+int etch_socket_get_timeout(etch_rawsocket* socket, unsigned* retms)
+{
+    int64 val = 0;
+    if (0 != apr_socket_timeout_get(socket, &val)) return -1;
+    if (retms) *retms = (unsigned) (val / 1000);
+    return 0;  
+}
+
+
+/**
+ * etchconx_set_socket_options()
+ */
+int etchconx_set_socket_options(void *c)
+{
+    return 0;
+}
+
+
+/**
+ * next_etch_connection_id()
+ * return a unique ID used to identify a connection instance
+ */
+unsigned next_etch_connection_id()
+{
+    do { apr_atomic_inc32 ((volatile apr_uint32_t*) &connection_id_farm);
+       } while(connection_id_farm == 0); 
+
+    return connection_id_farm;
+}
+
+
+/**
+ * etch_defconx_on_data()
+ * default receive data handler  
+ */
+int etch_defconx_on_data (void* conx, const int unused, int length, char* data)
+{
+    char* msgmask = "connxn %d no data handler provided for %d bytes\n";
+    etch_connection* cx = (etch_connection*) conx;
+    ETCH_ASSERT(is_etch_connection(cx));
+    etchlog(ETCHCONX, ETCHLOG_WARNING, msgmask, cx->conxid, length);
+    return 0;
+}
+
+
+/*
+ * etch_defconx_on_event()
+ * default handler for connection events
+ */
+int etch_defconx_on_event (void* c, const int e, int p1, void* p2)
+{
+    return 0;
+}
+
+
+/*
+ * etchconx_wait_for() 
+ * block until condition is met or timeout
+ */
+int etchconx_wait_for (etch_connection* cx, const int64 waitval, const int timeoutms)
+{
+    etchwait* waiter = cx->waiter;
+    int64* pcondvar  = &cx->waitcond;
+    int result = 0;
+
+    result = waiter->timed_waitequal (waiter, pcondvar, waitval, timeoutms);
+    return result;
+}
+
+
+/*
+ * etchconx_init_waitstate() 
+ * pre-set the condition variable target for wait state.
+ * this sets wait state to "waiting", and must be invoked prior to waiting up or down,
+ * if the up or down might complete before the user calls wait up or down. for example,
+ * if we want to establish a connection, then wait for it to come up, the connection
+ * may likely complete on another thread prior to the user calling wait up. we do this
+ * pre-setting of the condition variable to provide a target for the connection to 
+ * mark the state. 
+ */
+void etchconx_init_waitstate (etch_connection* cx)
+{
+    ETCH_ASSERT(cx && cx->waiter);
+    cx->waiter->cond_var = &cx->waitcond;   
+}
+
+
+/*
+ * etchconx_wait_up() 
+ * block until connection comes up or timeout
+ */
+int etchconx_wait_up (etch_connection* cx, int timeoutms)
+{
+    cx->waiter->cond_waitfor = 0;  /* ETCH_CONXEVT_UP; semikludge */
+    return etchconx_wait_for(cx, ETCH_CONXEVT_UP, timeoutms);
+}
+
+
+/*
+ * etchconx_wait_down() 
+ * block until connection is closed or timeout
+ */
+int etchconx_wait_down (etch_connection* cx, int timeoutms)
+{
+    cx->waiter->cond_waitfor = 0;  /* ETCH_CONXEVT_DOWN; semikludge */
+    return etchconx_wait_for(cx, ETCH_CONXEVT_DOWN, timeoutms);
+}
+
+
+/*
+ * new_etch_socket() 
+ * socket object wrapper constructor
+ */
+etch_socket* new_etch_socket(etch_rawsocket* rawsocket)
+{
+    etch_socket* newobj = (etch_socket*) new_object (sizeof(etch_socket), 
+        ETCHTYPEB_SOCKET, CLASSID_ETCHSOCKET);
+    /* fyi this object uses the generic destructor since the rawsocket
+     * is allocated from the apr pool and not explicitly freed by etch */
+    newobj->value = rawsocket;
+    return newobj;
+}
+
+
+/*
+ * etch_destroy_connection() 
+ * free memory owned by connection.
+ */
+int etch_destroy_connection(etch_connection* cx)
+{
+    if (cx)
+    {   destroy_etchwait (cx->waiter); 
+        destroy_etchmutex(cx->mutex);
+        etch_free(cx->hostname);
+        if (cx->is_ownpool) /* free apr subpool */
+            apr_pool_destroy(cx->aprpool);
+    }
+    return 0;
+}
+
+
+/*
+ * etch_init_connection()  
+ * initialize an etch_connection (abstract base)
+ */
+int etch_init_connection (etch_connection* cx, etch_rawsocket* socket, void* owner)
+{
+    memset(cx, 0, sizeof(etch_connection));
+
+    cx->conxid = next_etch_connection_id();
+    cx->owner  = owner;
+
+    /* set memory pool used for APR objects associated with this connection.
+     * if the connection is being created with an existing socket, use the
+     * memory pool already assigned to it, otherwise partition a new subpool. 
+     * if connection allocates a subpool, connection will free it on destroy.
+     */
+    if (socket) 
+        cx->aprpool = apr_socket_pool_get(socket);
+
+    if (cx->aprpool == NULL)
+    {   apr_pool_create(&cx->aprpool, etch_apr_mempool);
+        cx->is_ownpool = TRUE;
+    }
+
+    if (cx->aprpool == NULL)
+    {   cx->aprpool = etch_apr_mempool;
+        cx->is_ownpool = FALSE;
+    }
+
+    cx->sig      = ETCH_CONX_SIG;
+    cx->mutex    = new_mutex(cx->aprpool, TRUE);  
+    cx->waiter   = new_wait (cx->aprpool); 
+
+    cx->bufsize  = ETCH_CONX_DEFAULT_BUFSIZE;
+    cx->on_event = etch_defconx_on_event;
+    cx->on_data  = etch_defconx_on_data;
+    cx->set_socket_options = etchconx_set_socket_options;
+    return 0;
+}
+
+

Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,334 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_conxevent.c
+ * event handler overrides for the various connection superclasses
+ */
+
+#include "apr_network_io.h"
+#include "etchthread.h"
+#include "etch_tcpconxn.h"
+#include "etch_tcpserver.h"
+#include "etchlog.h"
+#include "etch_global.h"
+
+
+/*
+ * etch_deftcplistener_on_event()
+ * default handler for listener events
+ */
+int etch_deftcplistener_on_event(etch_tcp_server* l, etch_tcp_connection* c, const int e, int p1, void* p2)
+{
+    static char cxstr[24], *smask = "server %u";  
+       
+    if (c)
+    {   c->cx.listener = (etch_object*) l;
+        return  c->cx.on_event(c, e, p1, p2);   
+    }
+
+    sprintf(cxstr, smask, l->listener_id);
+
+    switch(e)
+    {
+       case ETCH_CONXEVT_CREATED:
+            etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created\n", cxstr);
+            break;
+
+       case ETCH_CONXEVT_CREATERR:
+            etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not created\n", cxstr);
+            break;
+
+       case ETCH_CONXEVT_OPENING:
+            etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opening ...\n", cxstr);
+            break;
+
+       case ETCH_CONXEVT_DESTROYING:
+            etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying ...\n", cxstr);
+            break;
+
+       case ETCH_CONXEVT_DESTROYED:
+            etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroyed\n", cxstr);
+            break;
+
+       case ETCH_CONXEVT_SHUTDOWN:
+            etchlog(ETCHCONX, ETCHLOG_WARNING, "%s shutdown request detected\n", cxstr);
+            break;
+    }
+
+    return 0;
+}
+
+
+/*
+ * etch_tcpconx_on_event()
+ * default handler for connection events
+ */
+int etch_tcpconx_on_event(etch_tcp_connection* c, const int e, int p1, void* p2)
+{
+    int result = 0, lid = 0;
+    static char cxstr[32], estr[128]; 
+    static char *scmask = "server %u connxn %u", *cmask = "connxn %u";
+
+    if  (is_etch_tcpserver(c->cx.listener))
+         lid = ((etch_tcp_server*)(c->cx.listener))->listener_id;
+    if  (lid)
+         sprintf(cxstr, scmask, lid, c->cx.conxid);     
+    else sprintf(cxstr, cmask, c->cx.conxid);
+
+    switch(e)
+    {
+        case ETCH_CONXEVT_RECEIVING:
+             etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s begin receive (block) ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_RECEIVERR:
+             apr_strerror(p1, estr, 128);
+             if  (IS_ETCH_SOCKET_TIMEOUT(p1))
+                  etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s socket receive timed out\n", cxstr); 
+             else
+             if  (p1 == APR_OTHER_END_CLOSED)
+                  etchlog(ETCHCONX, ETCHLOG_WARNING,"%s connection was broken\n", cxstr);  
+             else etchlog(ETCHCONX, ETCHLOG_ERROR,  "%s apr_socket_recv() %s\n", cxstr, estr); 
+             break;
+
+        case ETCH_CONXEVT_RECEIVED:   /* "eod=%d", p1 */
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s end receive %d bytes\n", cxstr, (int)(size_t)p2);    
+             break;
+
+       case ETCH_CONXEVT_CONXCLOSED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s local connection closed\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_PEERCLOSED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s peer connection closed\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_RECEIVEND:
+             etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s exit receive loop\n", cxstr, p1);                
+             break;
+
+        case ETCH_CONXEVT_SENDING:
+             etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s begin send\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_SENDERR:
+             apr_strerror(p1, estr, 128);
+             if  (IS_ETCH_SOCKET_TIMEOUT(p1))
+                  etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s socket send timed out\n", cxstr); 
+             else etchlog(ETCHCONX, ETCHLOG_ERROR, "%s apr_socket_send() %s\n", cxstr, estr); 
+             break;
+
+        case ETCH_CONXEVT_SENDEND:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s end send %d bytes\n", cxstr, p1);                
+             break;
+
+        case ETCH_CONXEVT_RCVPUMP_START:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s start receive pump\n", cxstr, p1);                
+             break;
+
+        case ETCH_CONXEVT_RCVPUMP_RECEIVING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s thread %d receiving ...\n", cxstr, p1);                
+             break;
+
+        case ETCH_CONXEVT_RCVPUMP_ERR:
+             result = -1;   
+             if  (p1 == APR_OTHER_END_CLOSED)
+                  etchlog(ETCHCONX, ETCHLOG_WARNING, "%s connection was broken\n", cxstr);  
+             else etchlog(ETCHCONX, ETCHLOG_ERROR, "%s receive failed\n", cxstr);  
+             break;
+
+        case ETCH_CONXEVT_RCVPUMP_STOP:
+             result = p1;
+             if (result >= 0)
+                 etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s receive pump on thread %d exited\n", 
+                    cxstr, (int) (size_t) p2);
+             else
+                 etchlog(ETCHCONX, ETCHLOG_ERROR, "%s receive pump abnormal exit\n", cxstr);
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying accepted connection\n", cxstr);                    
+             break;
+
+        case ETCH_CONXEVT_ACCEPTING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accepting ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_ACCEPTERR:
+             result = -1;
+             apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+             etchlog(ETCHCONX, ETCHLOG_ERROR, "%s apr_socket_accept() %s\n", cxstr, estr);
+             break;
+
+        case ETCH_CONXEVT_ACCEPTED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accepted\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_CREATED:
+             if (p1)
+                 etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created for socket %x\n", cxstr, p1);
+             else
+                 etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_CREATERR:
+             etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not created\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_OPENING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opening ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_OPENERR:
+             result = -1;
+
+             switch(p1)
+             {  case 0:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not opened\n", cxstr);
+                    break;
+                case 2:
+                    apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+                    etchlog(ETCHCONX, ETCHLOG_WARNING,
+                         "%s socket connect: %s\n", cxstr, estr);
+                    break;
+                case 3:
+                    apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                        "%s socket create: %s\n", cxstr, estr);
+                    break;
+                case 4:
+                    apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                        "%s sockaddr info: %s\n", cxstr, estr);
+                    break;
+                case 5:
+                    apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                        "%s socket bind: %s\n", cxstr, estr);
+                    break;
+                case 6:
+                    apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                        "%s socket listen: %s\n", cxstr, estr);
+                    break;
+             }
+             break;
+
+        case ETCH_CONXEVT_OPENED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opened\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_STARTING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s starting ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_STARTERR:
+             result = -1;
+             switch(p1)
+             {  case 0:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not started\n", cxstr);
+                    break;
+                case 1:                     
+                    etchlog(ETCHCONX, ETCHLOG_ERROR,
+                         "%s etch_threadpool.run()\n", cxstr);
+                    break;
+             }
+             break;
+
+        case ETCH_CONXEVT_STARTED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s started\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_ACCEPTPUMPEXIT:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accept pump on thread %d exited\n", cxstr, p1);
+             break;
+
+        case ETCH_CONXEVT_ACCEPTPUMPEXITERR:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accept pump abnormal exit\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_LISTENED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s server id is %d\n", cxstr, p1);
+             break;
+
+        case ETCH_CONXEVT_UP:
+        case ETCH_CONXEVT_DOWN:
+             break;
+
+        case ETCH_CONXEVT_SOCKOPTERR:
+             result = 1; /* to increment counter */
+             apr_strerror(p1, estr, 128);
+             etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                "%s set socket option %s: %s\n", cxstr, p2, estr);
+             break;
+
+        case ETCH_CONXEVT_SHUTDOWN:
+             etchlog(ETCHCONX, ETCHLOG_WARNING, "%s shutdown request detected\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_STOPPING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s stopping ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_STOPERR:
+             result = -1;
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s not stopped\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_STOPPED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s stopped\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_CLOSING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s closing ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_CLOSERR:
+             result = -1;
+
+             switch(p1)
+             {  case 0:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not closed\n", cxstr);
+                    break;
+                case 1:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, "%s close when not open\n", cxstr);
+                    break;
+                case 2:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, "%s concurrent close denied\n", cxstr);
+                    break;
+                case 3:
+                    etchlog(ETCHCONX, ETCHLOG_ERROR, 
+                        "%s apr_socket_close() error %d\n", cxstr, (int)(size_t)p2);
+                    break;
+             }
+             break;
+
+        case ETCH_CONXEVT_CLOSED:    
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s closed\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_DESTROYING:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying ...\n", cxstr);
+             break;
+
+        case ETCH_CONXEVT_DESTROYED:
+             etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroyed\n", cxstr);
+             break;
+    }
+
+    return result;
+}
+

Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,120 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_mailboxmgr.c
+ * i_mailbox manager interface
+ */
+
+#include "etch_mailboxmgr.h"
+#include "etch_message.h"
+#include "etch_global.h"
+#include "etchlog.h"
+
+int destroy_mailbox_manager_interface(i_mailbox_manager*);
+int etchmailboxmgr_def_headersize(i_mailbox_manager*);
+int etchmailboxmgr_def_transport_call(i_mailbox_manager*, etch_who*, etch_message*, void**);
+int etchmailboxmgr_def_redeliver (i_mailbox_manager*, etch_who*, etch_message*);
+int etchmailboxmgr_def_unregister(i_mailbox_manager*, i_mailbox*);
+
+
+/**
+ * new_mailboxmgr_interface()
+ * i_mailbox_manager constructor.
+ * @param thisx the mailbox manager object.
+ * @param itm transportmesssage interface, caller retains, can be null.
+ * @param ism sessionmessage interface, caller retains, can be null.
+ */
+i_mailbox_manager* new_mailboxmgr_interface(void* thisx, 
+    i_transportmessage* itm, i_sessionmessage* ism)  
+{
+    i_mailbox_manager* newi = (i_mailbox_manager*) new_object
+        (sizeof(i_mailbox_manager), ETCHTYPEB_MBOX_MANAGER, CLASSID_MBOX_MANAGER);
+
+    newi->thisx   = thisx;
+    newi->clone   = clone_null;
+    newi->destroy = destroy_mailbox_manager_interface;  
+
+    newi->transport_call = etchmailboxmgr_def_transport_call;
+    newi->redeliver      = etchmailboxmgr_def_redeliver;
+    newi->unregister     = etchmailboxmgr_def_unregister;
+
+    newi->itm = itm? itm: new_transportmsg_interface(newi, NULL, NULL); 
+    newi->transport_message = newi->itm->transport_message;
+    newi->transport_message = newi->itm->transport_message;
+    newi->transport_control = newi->itm->transport_control;
+    newi->transport_notify  = newi->itm->transport_notify;
+    newi->transport_query   = newi->itm->transport_query;
+    newi->get_session       = newi->itm->get_session;
+    newi->set_session       = newi->itm->set_session;
+
+    return newi;
+}
+
+
+/**
+ * destroy_mailbox_manager_interface()
+ * i_mailbox_manager destructor
+ */
+int destroy_mailbox_manager_interface(i_mailbox_manager* mgr)
+{
+    if (NULL == mgr) return -1;
+    if (mgr->refcount > 0 && --mgr->refcount > 0) return -1;  
+
+    if (!is_etchobj_static_content(mgr))
+    {    
+        if (mgr->ism)
+            mgr->ism->destroy(mgr->ism);
+
+        if (mgr->itm)
+            mgr->itm->destroy(mgr->itm);
+    }
+            
+    return destroy_objectex((objmask*)mgr);
+}
+
+
+/**
+ * etchmailboxmgr_def_transport_call()
+ * default implementation of i_mailbox_manager::transport_call() 
+ */
+int etchmailboxmgr_def_transport_call(i_mailbox_manager* imbm, etch_who* whoto, etch_message* msg, i_mailbox** out)
+{
+    return -1;
+} 
+
+
+/**
+ * etchmailboxmgr_def_redeliver()
+ * default implementation of i_mailbox_manager::redeliver() 
+ */
+int etchmailboxmgr_def_redeliver(i_mailbox_manager* imbm, etch_who* whofrom, etch_message* msg)
+{
+    return -1;
+}
+
+
+/**
+ * etchmailboxmgr_def_unregister()
+ * default implementation of i_mailbox_manager::unregister() 
+ */
+int etchmailboxmgr_def_unregister(i_mailbox_manager* imbm, i_mailbox* mbox)
+{
+    return -1;
+}
+

Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,450 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_msgizer.c
+ * messagizer accepts packets and translates them to messages,
+ * and it accepts messages and translates them to packets.
+ */
+
+#include "etch_messagizer.h"
+#include "etch_tdformat.h"
+#include "etchthread.h"
+#include "etchlog.h"
+wchar_t* ETCHWMSZ = L"MSGZ";
+char*    ETCHMSGZ = "MSGZ";
+
+int destroy_messagizer(etch_messagizer*);
+int      etch_msgizer_transport_message(etch_messagizer*, etch_who*, etch_message*);
+int      etch_msgizer_session_packet   (etch_messagizer*, etch_who*, etch_flexbuffer*);
+int      etch_msgizer_session_control  (etch_messagizer*, etch_event*, objmask*);
+int      etch_msgizer_session_notify   (etch_messagizer*, etch_event*);
+objmask* etch_msgizer_session_query    (etch_messagizer*, objmask*); 
+int      etch_msgizer_transport_control(etch_messagizer*, etch_event*, objmask*);
+int      etch_msgizer_transport_notify (etch_messagizer*, etch_event*);
+objmask* etch_msgizer_transport_query  (etch_messagizer*, objmask*); 
+
+
+/* - - - - - - - - - - - - - - -
+ * etch_messagizer
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * new_messagizer()
+ * etch_messagizer public constructor
+ * @param ixport transport interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resx a resources map owned by caller.
+ */
+
+etch_messagizer* new_messagizer (i_transportpacket* ixp, wchar_t* uri, etch_resources* resx)
+{
+    etch_url* url = new_url(uri);   
+
+    etch_messagizer* messagizer = new_messagizer_a(ixp, url, resx);  
+
+    url->destroy(url);
+    return messagizer;
+}
+
+
+/**
+ * new_messagizer_a()
+ * etch_messagizer private constructor
+ * @param ipacket transport interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resxmap a resources map owned by caller.
+ */
+etch_messagizer* new_messagizer_a (i_transportpacket* ixp, etch_url* url, etch_resources* resxmap)
+{
+    tagdata_format_factory* formatfactory = NULL;
+    etch_messagizer* messagizer = NULL;
+    i_transport* itransport = NULL;
+    etch_value_factory* vf  = NULL;
+    etch_string* formatobj  = NULL;
+    i_session* isession = NULL;
+    etchmutex* mutex = NULL;
+    int result = -1;
+
+    do
+    {   formatobj = (etch_string*) etchurl_get_term (url, ETCH_RESXKEY_MSGIZER_FORMAT);
+
+        if (NULL == formatobj && NULL != resxmap)
+            formatobj = (etch_string*) etch_resources_get (resxmap, ETCH_RESXKEY_MSGIZER_FORMAT);
+
+        if (NULL == formatobj)
+        {   etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"URI missing format specifier '%s'\n", 
+            ETCH_RESXKEY_MSGIZER_FORMAT); 
+            break;
+        }
+
+        formatfactory = get_format_factory(formatobj->v.valw);
+        if (NULL == formatfactory) 
+        {   etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"no format '%s' is installed\n", 
+            formatobj->v.valw); 
+            break;
+        }
+
+        vf = resxmap? (etch_value_factory*) etch_resources_get(resxmap,
+                          ETCH_RESXKEY_MSGIZER_VALUFACT): NULL;
+        if (NULL == vf)
+        {   etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"no value factory '%s' is installed\n", 
+            ETCH_RESXKEY_MSGIZER_VALUFACT); 
+            break;
+        }
+
+        if (NULL == (mutex = new_mutex(etch_apr_mempool, ETCHMUTEX_NESTED))) break;
+
+        /* - - - - - - - - - - - - - - -
+         * etch_messagizer
+         * - - - - - - - - - - - - - - -
+         */
+        messagizer = (etch_messagizer*) new_object
+            (sizeof(etch_messagizer), ETCHTYPEB_MESSAGIZER, CLASSID_MESSAGIZER);
+
+        messagizer->destroy = destroy_messagizer;
+        messagizer->clone   = clone_null;
+        messagizer->msglock = mutex;
+        messagizer->msgbuf  = new_flexbuffer(ETCH_DEFSIZE);  /* 2K default */
+
+        /* set our transport to that of next lower layer (packetizer) */
+        messagizer->transport = ixp;  /* i_transportpacket owned by caller */
+
+        /* - - - - - - - - - - - - - - -
+         * i_transportmessage
+         * - - - - - - - - - - - - - - -
+         */
+        itransport = new_transport_interface_ex (messagizer,  
+            (etch_transport_control) etch_msgizer_transport_control, 
+            (etch_transport_notify)  etch_msgizer_transport_notify, 
+            (etch_transport_query)   etch_msgizer_transport_query,
+             etch_msgizer_get_session, 
+             etch_msgizer_set_session);
+
+        /* instantiate i_transportmessage interface which messagizer implements */
+        messagizer->transportmsg = new_transportmsg_interface(messagizer, 
+            etch_msgizer_transport_message, 
+            itransport);  /* transportmsg now owns itransport */
+
+        /* copy i_transportmessage interface methods up to messagizer */
+        messagizer->transport_message = etch_msgizer_transport_message;
+        messagizer->transport_control = itransport->transport_control;
+        messagizer->transport_notify  = itransport->transport_notify;
+        messagizer->transport_query   = itransport->transport_query;
+        messagizer->get_session       = itransport->get_session;
+        messagizer->set_session       = itransport->set_session;     
+
+        /* - - - - - - - - - - - - - - -
+         * i_sessionpacket
+         * - - - - - - - - - - - - - - -
+         */
+        isession = new_session_interface(messagizer,  
+            (etch_session_control) etch_msgizer_session_control, 
+            (etch_session_notify)  etch_msgizer_session_notify, 
+            (etch_session_query)   etch_msgizer_session_query);
+
+        /* instantiate i_sessionpacket interface which messagizer implements */
+        messagizer->sessionpkt = new_sessionpkt_interface(messagizer, 
+            etch_msgizer_session_packet, 
+            isession);  /* transportmsg now owns isession */
+
+        /* copy session interface to parent */
+        messagizer->session_packet  = etch_msgizer_session_packet;
+        messagizer->session_control = isession->session_control;
+        messagizer->session_notify  = isession->session_notify;
+        messagizer->session_query   = isession->session_query;
+
+        /* finally set session of next lower layer to our session */
+        /* fyi we must pass the implementor of transport as thisx, i.e. packetizer */
+        messagizer->transport->set_session (messagizer->transport->thisx, messagizer->sessionpkt);
+
+        /* - - - - - - - - - - - - - - -
+         * tagged data in out
+         * - - - - - - - - - - - - - - -
+         */
+        messagizer->tdi = formatfactory->new_tagdata_input(vf);
+        if (NULL == messagizer->tdi) break;
+
+        messagizer->tdo = formatfactory->new_tagdata_output(vf);
+        if (NULL == messagizer->tdo) break;
+        
+        result = 0;
+
+    } while(0);
+
+
+    if (-1 == result)
+    {   
+        if (vf)
+            vf->destroy(vf);
+
+        if (messagizer)
+            messagizer->destroy(messagizer);
+
+        messagizer = NULL;
+    } 
+
+    if (formatfactory)
+        formatfactory->destroy(formatfactory);
+
+    return messagizer;
+}
+
+
+/**
+ * destroy_messagizer()
+ * destructor for etch_messagizer
+ */
+int destroy_messagizer (etch_messagizer* thisx)
+{
+    if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;  
+
+    if (!is_etchobj_static_content(thisx))
+    {   
+        if (thisx->transportmsg)
+            thisx->transportmsg->destroy(thisx->transportmsg);
+
+        if (thisx->sessionpkt)
+            thisx->sessionpkt->destroy(thisx->sessionpkt);
+
+        if (thisx->tdi)
+            thisx->tdi->destroy(thisx->tdi);
+
+        if (thisx->tdo)
+            thisx->tdo->destroy(thisx->tdo);
+
+        if (thisx->msgbuf)
+            thisx->msgbuf->destroy(thisx->msgbuf);
+
+        if (thisx->msglock)
+            thisx->msglock->destroy(thisx->msglock);
+    }
+
+   return destroy_objectex((objmask*) thisx);
+}
+
+
+/**
+ * etch_msgizer_get_transport()
+ * @return a reference to the messagizer transport interface.
+ * this object is owned by whatever object created the messagizer.
+ */
+i_transportpacket* etch_msgizer_get_transport (etch_messagizer* thisx)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    return thisx->transport;   
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_transportmessage
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_msgizer_transport_message()
+ * i_transportmessage::transport_message override.
+ * serializes message and delivers its data to transport  
+ * @param whoto recipient - caller retains this memory, can be null.
+ * @param message the message
+ * caller relinquishes this memory on success, retains on failure.  
+ * @return 0 success, -1 error.
+ */
+int etch_msgizer_transport_message (etch_messagizer* thisx, etch_who* whoto, 
+    etch_message* msg)
+{  
+    int result = 0;
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    thisx->msglock->acquire(thisx->msglock);  
+
+    do
+    {   const int headersize = thisx->transport->get_headersize (thisx->transport);
+
+        etch_flexbuf_skip (thisx->msgbuf, headersize, TRUE);
+
+        /* serialize the message to the buffer */
+        result = thisx->tdo->vtab->write_message (thisx->tdo, msg, thisx->msgbuf);
+
+        if (-1 == result) break;
+
+        etch_flexbuf_set_index (thisx->msgbuf, 0);
+
+        /* deliver packet buffer to transport */    /* msgizer->itp->pktizer */
+        result = thisx->transport->transport_packet (thisx->transport->thisx, 
+            whoto, thisx->msgbuf);
+
+        if (0 == result)
+            msg->destroy(msg);  
+
+    } while(0);
+
+    thisx->msglock->release(thisx->msglock);  
+    return result;
+}
+
+
+/**
+ * etch_msgizer_get_session()
+ * @return a reference to the messagizer i_sessionmessage interface.
+ * this object is owned by whatever object set the messagizer session.
+ */
+i_sessionmessage* etch_msgizer_get_session (etch_messagizer* thisx) 
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    return thisx->session;
+}
+
+
+/**
+ * etch_msgizer_set_session()
+ * @param session an i_sessionmessage reference. caller owns this object.
+ */
+void etch_msgizer_set_session (etch_messagizer* thisx, i_sessionmessage* session)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    ETCH_ASSERT(is_etch_sessionmsg(session));  
+    thisx->session = session;
+}
+
+
+/**
+ * etch_msgizer_transport_control()
+ * i_transportmessage::transport_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_msgizer_transport_control (etch_messagizer* thisx, etch_event* control, objmask* value)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    /*     mzr    itp                           mzr    itp   packetizer */
+    return thisx->transport->transport_control (thisx->transport->thisx, control, value);  
+}
+
+
+/**
+ * etch_msgizer_transport_notify()
+ * i_transportmessage::transport_notify override.
+ * @param evt, caller relinquishes.
+ */
+int etch_msgizer_transport_notify (etch_messagizer* thisx, etch_event* evt)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    return thisx->transport->transport_notify (thisx->transport->thisx, evt);
+}
+
+
+/**
+ * etch_msgizer_transport_query()
+ * i_transportmessage::transport_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_msgizer_transport_query (etch_messagizer* thisx, objmask* query) 
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+    return thisx->transport->transport_query (thisx->transport->thisx, query);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_sessionpacket
+ * - - - - - - - - - - - - - - -
+ */
+  
+/**
+ * etch_msgizer_session_packet()
+ * i_sessionpacket::session_packet override.
+ * delivers data to the session from the transport.
+ * @param from from who sent the packet.
+ * caller retains memory for this object, can be null.
+ * @param buffer the packet from the packet source.
+ * caller retains memory for this object. 
+ * @return 0 success, -1 error (exception condition)
+ */
+int etch_msgizer_session_packet (etch_messagizer* thisx, etch_who* whofrom, etch_flexbuffer* fbuf)
+{
+    etch_message* msg = NULL;
+    int is_message_handled = FALSE;
+    ETCH_ASSERT(is_etch_messagizer(thisx));
+
+    /* create an etch message from the packetized data */
+    msg = thisx->tdi->vtab->read_message (thisx->tdi, fbuf);
+    
+    if (NULL == msg) return -1;
+
+    /* send the new message up via session.
+     * memory management rules are: if session_message() handles the message, 
+     * it owns msg memory. otherwise (if not handled), msg memory is owned by  
+     * the unwanted_message wrapper created here, which itself is owned by the 
+     * session_notify() destination.
+     */
+    is_message_handled /* call mailbox mgr to queue the new message up to a mailbox */
+        = (0 == thisx->session->session_message (thisx->session->thisx, whofrom, msg));
+
+    /* if the message was not handled, e.g. the message is an exception returned 
+     * from a one-way message and there was therefore no mailbox to receive it, 
+     * forward the message to the session via mailbox manager.
+     */
+    if (!is_message_handled)  
+    {   etchlog (ETCHMSGZ, ETCHLOG_DEBUG, "unable to post message to a mailbox\n"),
+        etchlog (ETCHMSGZ, ETCHLOG_DEBUG, "deferring '%s' to session\n", message_aname(msg));
+        thisx->session->session_notify (thisx->session->thisx, new_unwanted_message(whofrom, msg));
+    }
+
+    /* regardless we have relinquished msg at this point. it is now either queued up 
+     * in a mailbox, or forwarded in the unwanted message wrapper above. */
+    return 0;
+}
+
+
+/**
+ * etch_msgizer_session_control()
+ * i_sessionpacket::session_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_msgizer_session_control (etch_messagizer* thisx, etch_event* control, objmask* value)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));       /* ism    mbmgr */
+    return thisx->session->session_control (thisx->session->thisx, control, value);
+}
+
+
+/**
+ * etch_msgizer_session_notify()
+ * i_sessionpacket::session_notify override.
+ * @param event, caller relinquishes.
+ */
+int etch_msgizer_session_notify  (etch_messagizer* thisx, etch_event* evt)
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));      /* ism    mbmgr */
+    return thisx->session->session_notify (thisx->session->thisx, evt);
+}
+
+
+/**
+ * etch_msgizer_session_query()
+ * i_sessionpacket::session_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_msgizer_session_query (etch_messagizer* thisx, objmask* query) 
+{
+    ETCH_ASSERT(is_etch_messagizer(thisx));     /* ism    mbmgr */
+    return thisx->session->session_query (thisx->session->thisx, query);
+}
+

Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,506 @@
+/* $Id$ 
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
+ * contributor license agreements. See the NOTICE file distributed with  
+ * this work for additional information regarding copyright ownership. 
+ * The ASF licenses this file to you under the Apache License, Version  
+ * 2.0 (the "License"); you may not use this file except in compliance  
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ */ 
+
+/*
+ * etch_packetizer.c
+ * packetizes a stream data source. Reads and verfifies a packet header consisting
+ * of a 32-bit signature and 32-bit length. verifies the flag, using length extracted
+ * from header; reads packet data and forwards it to the packet handler. as a packet 
+ * source, accepts a packet and prepends a packet header prior to delivering it to a 
+ * data source.
+ */
+
+#include "apr_network_io.h"
+#include "etch_packetizer.h"
+#include "etch_global.h"
+#include "etch_url.h"
+#include "etchlog.h"
+
+wchar_t* ETCHPKZRW = L"PKZR";
+char*    ETCHPKZR  = "PKZR";
+
+int destroy_packetizer(etch_packetizer*);
+etch_packetizer* new_packetizer_b (i_transportdata*, const int maxpktsize);
+int      etch_pktizer_transport_packet (etch_packetizer*, etch_who*, etch_flexbuffer*);
+int      etch_pktizer_session_data     (etch_packetizer*, etch_who*, etch_flexbuffer*);
+int      etch_pktizer_session_control  (etch_packetizer*, etch_event*, objmask*);
+int      etch_pktizer_session_notify   (etch_packetizer*, etch_event*);
+objmask* etch_pktizer_session_query    (etch_packetizer*, objmask*); 
+int      etch_pktizer_transport_control(etch_packetizer*, etch_event*, objmask*);
+int      etch_pktizer_transport_notify (etch_packetizer*, etch_event*);
+objmask* etch_pktizer_transport_query  (etch_packetizer*, objmask*); 
+
+const wchar_t* ETCH_PKTIZER_MAX_PKT_SIZE_TERM = L"Packetizer.maxPktSize";
+const int ETCH_PKTIZER_DEFMAXPKTSIZE = 10240;
+const int ETCH_PKTIZER_HEADERSIZE = 8;
+const int ETCH_PKTIZER_SIG = 0xdeadbeef;
+
+
+/* - - - - - - - - - - - - - - - - - - - - - - 
+ * packetizer constructor/destructor
+ * - - - - - - - - - - - - - - - - - - - - - -
+ */
+
+/**
+ * new_packetizer()
+ * * etch_packetizer public constructor
+ * @param itd i_transportdata interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resources a resources map owned by caller.
+ */
+etch_packetizer* new_packetizer (i_transportdata* itd, wchar_t* uri, etch_resources* resxmap) 
+{
+    etch_url* url = new_url(uri);   
+
+    etch_packetizer* packetizer = new_packetizer_a (itd, url, resxmap);  
+
+    url->destroy(url);
+    return packetizer;
+}
+
+
+/**
+ * new_packetizer_a()
+ * etch_packetizer constructor 2
+ */
+etch_packetizer* new_packetizer_a (i_transportdata* itd, etch_url* url, etch_resources* resxmap)
+{
+    int maxpktsize = 0, result = 0;
+    etch_packetizer* packetizer = NULL;
+
+    result = etchurl_get_integer_term (url, ETCH_PKTIZER_MAX_PKT_SIZE_TERM, &maxpktsize);
+    if (-1 == result || maxpktsize <= 0) maxpktsize = ETCH_PKTIZER_DEFMAXPKTSIZE;
+
+    return new_packetizer_b (itd, maxpktsize);
+}
+
+
+/**
+ * new_packetizer_b()
+ * etch_packetizer private constructor
+ * @param transport the transport interface of the next lower layer of the stack,
+ * that being the connection, e.g. etch_tcp_connection. not owned.
+ * @param maxpktsize maximum number of bytes in a packet (default currently 10240)
+ */
+etch_packetizer* new_packetizer_b (i_transportdata* itd, const int maxpktsize)
+{
+    etch_packetizer* packetizer = NULL;
+    i_transport* itransport = NULL;
+    i_session* isession = NULL;
+    etchmutex* mutex = NULL;
+    int result = -1;
+    ETCH_ASSERT(is_etch_transportdata(itd));
+
+    do
+    {   
+        #if(ETCHPZR_HAS_MUTEX)
+        if (NULL == (mutex = new_mutex(etch_apr_mempool, ETCHMUTEX_NESTED))) break;
+        #endif
+
+        /* - - - - - - - - - - - - - - -
+         * etch_packetizer
+         * - - - - - - - - - - - - - - -
+         */
+        packetizer = (etch_packetizer*) new_object
+            (sizeof(etch_packetizer), ETCHTYPEB_PACKETIZER, CLASSID_PACKETIZER);
+
+        packetizer->destroy  = destroy_packetizer;  
+        packetizer->clone    = clone_null;
+        packetizer->datalock = mutex;
+
+        packetizer->headersize    = ETCH_PKTIZER_HEADERSIZE;
+        packetizer->is_wantheader = TRUE;
+        packetizer->maxpacketsize = maxpktsize > 0? maxpktsize: ETCH_PKTIZER_DEFMAXPKTSIZE;
+
+        packetizer->savebuf = new_flexbuffer(ETCH_DEFSIZE); /* 2K default */
+
+        /* set our transport to that of next lower layer (connection) */
+        packetizer->transport = itd;  /* not owned */
+            
+
+        /* - - - - - - - - - - - - - - -
+         * i_transportpacket
+         * - - - - - - - - - - - - - - -
+         */
+        itransport = new_transport_interface_ex (packetizer,  
+            (etch_transport_control) etch_pktizer_transport_control, 
+            (etch_transport_notify)  etch_pktizer_transport_notify, 
+            (etch_transport_query)   etch_pktizer_transport_query,
+             etch_pktizer_get_session, 
+             etch_pktizer_set_session);
+
+        /* instantiate i_transportpacket interface which packetizer implements */
+        packetizer->transportpkt = new_transportpkt_interface (packetizer, 
+            etch_pktizer_transport_packet, 
+            itransport);  /* transportpkt now owns itransport */
+
+        /* copy i_transportpacket interface methods up to packetizer */
+        packetizer->transport_packet  = etch_pktizer_transport_packet;
+        packetizer->transport_control = itransport->transport_control;
+        packetizer->transport_notify  = itransport->transport_notify;
+        packetizer->transport_query   = itransport->transport_query;
+        packetizer->get_session       = itransport->get_session;
+        packetizer->set_session       = itransport->set_session;
+
+
+        /* - - - - - - - - - - - - - - -
+         * i_sessiondata
+         * - - - - - - - - - - - - - - -
+         */
+        isession = new_session_interface (packetizer,  
+            (etch_session_control) etch_pktizer_session_control, 
+            (etch_session_notify)  etch_pktizer_session_notify, 
+            (etch_session_query)   etch_pktizer_session_query);
+
+        /* instantiate i_sessiondata interface which packetizer implements */
+        packetizer->sessiondata = new_sessiondata_interface(packetizer, 
+            etch_pktizer_session_data, 
+            isession);  /* sessiondata now owns isession */ 
+
+        /* copy session interface to parent */
+        packetizer->session_data    = etch_pktizer_session_data;
+        packetizer->session_control = isession->session_control;
+        packetizer->session_notify  = isession->session_notify;
+        packetizer->session_query   = isession->session_query;
+
+        /* finally set session of next lower layer (tcp connection) to our session */
+        /* fyi we must pass the implementor of transport as thisx, i.e. tcpconnection */
+        packetizer->transport->set_session (packetizer->transport->thisx, packetizer->sessiondata);
+        
+        result = 0;
+
+    } while(0);
+
+
+    if (-1 == result)
+    { 
+        if (packetizer)
+            packetizer->destroy(packetizer);
+
+        packetizer = NULL;
+    } 
+
+    return packetizer;
+}
+
+
+/*
+ * destroy_packetizer()
+ * etch_packetizer destructor
+ */
+int destroy_packetizer(etch_packetizer* thisx)
+{
+    if (NULL == thisx) return -1;
+    if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;  
+
+    if (!is_etchobj_static_content(thisx))
+    {  
+        if (thisx->sessiondata)
+            thisx->sessiondata->destroy(thisx->sessiondata);
+
+        if (thisx->transportpkt)
+            thisx->transportpkt->destroy(thisx->transportpkt);
+
+        if (thisx->datalock)
+            thisx->datalock->destroy(thisx->datalock);
+
+        if (thisx->savebuf) 
+            thisx->savebuf->destroy(thisx->savebuf); 
+    }
+            
+    return destroy_objectex((objmask*)thisx);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_transportpacket
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_pktizer_transport_packet()
+ * delivers packet to transport after adding packet header.
+ * @param whoto recipient - caller retains this memory.
+ * @param fbuf buffer positioned at the packet data, with space for header
+ * @return 0 success, -1 error
+ */
+int etch_pktizer_transport_packet(etch_packetizer* thisx, etch_who* whoto, etch_flexbuffer* fbuf)
+{  
+    int result = -1;
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+
+    do
+    {   size_t saveindex = 0;
+        const int datalen = (int) etch_flexbuf_avail(fbuf);
+
+        if (datalen < ETCH_PKTIZER_HEADERSIZE) break;
+
+        saveindex = fbuf->index;
+
+        etch_flexbuf_put_int(fbuf, ETCH_PKTIZER_SIG);
+        etch_flexbuf_put_int(fbuf, datalen - ETCH_PKTIZER_HEADERSIZE);
+
+        fbuf->index = saveindex;       
+
+        /* deliver packet buffer to transport */        
+        result = thisx->transport->transport_data(thisx->transport->thisx, whoto, fbuf);
+
+    } while(0);
+
+    return result;
+}
+
+
+/**
+ * etch_pktizer_get_session()
+ * @return a reference to the packetizer i_sessiondata interface.
+ * this object is owned by whatever object set the packetizer session.
+ */
+i_sessionpacket* etch_pktizer_get_session (etch_packetizer* thisx) 
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->session;
+}
+
+
+/**
+ * etch_pktizer_set_session()
+ * set packetizer session interface presumably to that of the next higher layer.
+ * @param session an i_sessionpacket reference. caller owns this object.
+ */
+void etch_pktizer_set_session (etch_packetizer* thisx, i_sessionpacket* newsession)
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    ETCH_ASSERT(is_etch_sessionpacket(newsession));
+    thisx->session = newsession;
+}
+
+
+/**
+ * etch_pktizer_transport_control()
+ * i_transportpacket::transport_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_pktizer_transport_control (etch_packetizer* thisx, etch_event* control, objmask* value)
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    /*      pzr     itd                          pzr      itd    tcpconx */
+    return thisx->transport->transport_control (thisx->transport->thisx, control, value); 
+}
+
+
+/**
+ * etch_pktizer_transport_notify()
+ * i_transportpacket::transport_notify override.
+ * @param evt, caller relinquishes.
+ */
+int etch_pktizer_transport_notify (etch_packetizer* thisx, etch_event* evt)
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->transport->transport_notify (thisx->transport->thisx, evt);
+}
+
+
+/**
+ * etch_pktizer_transport_query()
+ * i_transportpacket::transport_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_pktizer_transport_query (etch_packetizer* thisx, objmask* query) 
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->transport->transport_query (thisx->transport->thisx, query);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_sessiondata
+ * - - - - - - - - - - - - - - -
+ */
+  
+/**
+ * etch_pktizer_session_data()
+ * i_sessiondata::session_data override.
+ * delivers data to the session from the transport
+ * @param whofrom from who sent the packet data
+ * caller retains this memory, can be null.
+ * @param fbuf the packet from the packet source positioned at the data, caller retains.
+ * @return 0 success, -1 error. 
+ */
+int etch_pktizer_session_data (etch_packetizer* pzr, etch_who* whofrom, etch_flexbuffer* fbuf)
+{
+    /* two scenarios: the first is that we have no buffered data and the 
+     * entire packet is contained within the buffer. in that case we can skip  
+	 * the details and ship the packet directly to the handler.
+     */
+    size_t curlen, curndx, bytes_avail, fbuf_avail, packet_bodylen, bytes_put;
+    int  bytes_needed;
+    etch_flexbuffer* savebuf = NULL;
+    ETCH_ASSERT(is_etch_packetizer(pzr));
+    savebuf = pzr->savebuf;
+
+    while(1) /* while(fbuf.avail() > 0 ... */
+    {   if (0 >= (fbuf_avail = etch_flexbuf_avail(fbuf))) break; 
+
+        bytes_avail = savebuf->datalen + fbuf_avail;
+
+        if (pzr->is_wantheader)
+        {   
+            if (bytes_avail >= (unsigned) ETCH_PKTIZER_HEADERSIZE)
+            {   /* there are enough bytes in the pipeline for a header */
+                if (savebuf->datalen == 0)
+                {   /* save buffer is empty, entire header in fbuf */ 
+                    packet_bodylen = etch_pktizer_process_header(pzr, fbuf, FALSE);  
+                }
+                else /* save non empty, header split across save and buf, so ... */
+                {    /* move enough bytes from buf to save to complete a header */
+                    bytes_needed = ETCH_PKTIZER_HEADERSIZE - (int) savebuf->datalen;
+
+                    bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
+                    if (0 == bytes_put) return -1;
+                    etch_flexbuf_set_index(savebuf, 0);
+                    packet_bodylen = etch_pktizer_process_header(pzr, savebuf, TRUE);
+                }	
+			
+                if (packet_bodylen == -1) return -1;  /* bad header */
+				if (packet_bodylen == 0)  continue;	  /* header with empty body */					 
+				pzr->bodylength = packet_bodylen;	
+				pzr->is_wantheader = FALSE;
+            }
+            else /* want header, but too few bytes in the pipeline */
+			{    /* ... so save fbuf to the save buffer */
+                etch_flexbuf_set_index (savebuf, savebuf->datalen);
+                etch_flexbuf_put_from(savebuf, fbuf, -1);
+		    }
+        }
+        else /* didn't need a header, so ... */
+        if (bytes_avail >= pzr->bodylength)
+        {
+			/* we need the body, and there are enough bytes in the pipeline.
+			 * three scenarios: the body is entirely in savebuf, the body is 
+			 * split, or the body is entirely in fbuf. assert that the body   
+             * is not entirely in save, or we'd have processed it last time.  
+             */
+            ETCH_ASSERT(savebuf->datalen < pzr->bodylength);
+
+            if (savebuf->datalen == 0)
+            {   /* saved buffer is empty, entire body is in fbuf */
+                curlen = fbuf->datalen;
+                curndx = fbuf->index;
+                etch_flexbuf_set_length(fbuf, curndx + pzr->bodylength);
+
+                /* send the packet in the input buffer up the chain to the next 
+                 * higher layer of the transport stack. fyi packetizer.session.thisx 
+                 * is that layer object, ordinarily the messagizer.
+                 */
+                pzr->session->session_packet (pzr->session->thisx, whofrom, fbuf);
+
+                /* fyi the input buffer can contain a partial packet, multiple 
+                 * packets, or whatever, so we may not be done with it yet. */
+                etch_flexbuf_set_length(fbuf, curlen);   
+                etch_flexbuf_set_index (fbuf, curndx + pzr->bodylength);
+                pzr->is_wantheader = TRUE;
+            }
+            else 
+            {   /* savebuf.datalen not zero, so body is split across the save buffer 
+                 * and the input buffer. move enough bytes from input buffer fbuf 
+                 * to complete the packet body. 
+                 */
+                bytes_needed = (int) (pzr->bodylength - savebuf->datalen);
+                bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
+                if (bytes_put <= 0) return -1;
+                etch_flexbuf_set_index(savebuf, 0);
+
+                /* send the newly-isolated packet up the chain to the next higher
+                 * layer of the transport stack. fyi packetizer.session.thisx 
+                 * is that layer object, ordinarily the messagizer.
+                 */
+                pzr->session->session_packet (pzr->session->thisx, whofrom, savebuf);
+                
+                etch_flexbuf_reset(savebuf);
+                pzr->is_wantheader = TRUE;                
+            }
+        }
+        else /* need body but too few bytes in pipeline to complete it */
+        {    /* ... so save the input buffer to the save buffer */
+            bytes_put = etch_flexbuf_put_from (savebuf, fbuf, ETCH_FLEXBUF_PUT_ALL);
+        }
+    }
+
+    /* buffer is now empty and we're done */
+    return fbuf_avail == 0? 0: -1;
+}
+
+
+/*
+ * etch_pktizer_process_header()
+ * extract header information from packet
+ * returns data length from header, or -1 if bad header
+ */
+int etch_pktizer_process_header (etch_packetizer* pzr, etch_flexbuffer* fbuf, const int is_reset)
+{
+    int curlen = 0, signature = 0;
+    int result = etch_flexbuf_get_int(fbuf, &signature);
+    if (-1 == result || ETCH_PKTIZER_SIG != signature) return -1;
+
+    result = etch_flexbuf_get_int(fbuf, &curlen);
+
+    if (is_reset)
+        etch_flexbuf_reset(fbuf);
+
+    return result == -1 || curlen < 0 || curlen > (int) pzr->maxpacketsize? 
+        -1: curlen;
+}
+
+
+/**
+ * etch_pktizer_session_control()
+ * i_sessiondata::session_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_pktizer_session_control (etch_packetizer* thisx, etch_event* control, objmask* value)
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->session->session_control (thisx->session->thisx, control, value);
+}
+
+
+/**
+ * etch_pktizer_session_notify()
+ * i_sessiondata::session_notify override.
+ * @param event, caller relinquishes.
+ */
+int etch_pktizer_session_notify  (etch_packetizer* thisx, etch_event* evt)
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->session->session_notify (thisx->session->thisx, evt);
+}
+
+
+/**
+ * etch_pktizer_session_query()
+ * i_sessiondata::session_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_pktizer_session_query (etch_packetizer* thisx, objmask* query) 
+{
+    ETCH_ASSERT(is_etch_packetizer(thisx));
+    return thisx->session->session_query (thisx->session->thisx, query);
+}