You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by jd...@apache.org on 2009/04/22 19:25:51 UTC
svn commit: r767594 [36/43] - in /incubator/etch/trunk/binding-c/runtime/c:
./ ext/ ext/hashtab/ ext/lib/ inc/ lib/ project/ project/$etchstop/
project/bin/ project/etch/ project/logcli/ project/logsrv/ project/notes/
project/test/ project/test/logcli/...
Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_connection.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,259 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_connection.c
+ * connection client and server classes - tcp, udp
+ */
+
+#include "apr_time.h"
+#include "apr_network_io.h"
+#include "etchthread.h"
+#include "etch_connection.h"
+#include "etch_encoding.h"
+#include "etchflexbuf.h"
+#include "etchlog.h"
+#include "etch_global.h"
+
+const wchar_t* ETCH_CONNECTION_RECONDELAY = L"TcpConnection.reconnect_delay";
+const wchar_t* ETCH_CONNECTION_AUTOFLUSH = L"TcpConnection.autoFlush";
+const wchar_t* ETCH_CONNECTION_KEEPALIVE = L"TcpConnection.keepAlive";
+const wchar_t* ETCH_CONNECTION_LINGERTIME = L"TcpConnection.lingerTime";
+const wchar_t* ETCH_CONNECTION_NODELAY = L"TcpConnection.noDelay";
+const wchar_t* ETCH_CONNECTION_TRAFCLASS = L"TcpConnection.trafficClass";
+const wchar_t* ETCH_CONNECTION_BUFSIZE = L"TcpConnection.bufferSize";
+const wchar_t* ETCH_TCPLISTENER_BACKLOG = L"TcpListener.backlog";
+char* ETCHCONX = "CONX";
+
+unsigned next_etch_connection_id();
+
+
+/*
+ * is_good_tcp_params()
+ */
+int is_good_tcp_params(etch_url* url, void* resources, etch_rawsocket* socket)
+{
+ int whicherr = 0;
+
+ if (NULL == socket)
+ {
+ if (NULL == url->host) whicherr = 0x1;
+ if (url->port <= 0 || url->port > 0xffff) whicherr |= 0x2;
+ }
+
+ return whicherr == 0;
+}
+
+
+/*
+ * etch_socket_set_timeout
+ * set timeout for specified socket
+ */
+int etch_socket_set_timeout(etch_rawsocket* socket, const unsigned ms)
+{
+ const int result = apr_socket_timeout_set(socket, ms * 1000);
+ return 0 == result? 0: -1;
+}
+
+
+/*
+ * etch_socket_get_timeout
+ * get timeout set on specified socket
+ */
+int etch_socket_get_timeout(etch_rawsocket* socket, unsigned* retms)
+{
+ int64 val = 0;
+ if (0 != apr_socket_timeout_get(socket, &val)) return -1;
+ if (retms) *retms = (unsigned) (val / 1000);
+ return 0;
+}
+
+
+/**
+ * etchconx_set_socket_options()
+ */
+int etchconx_set_socket_options(void *c)
+{
+ return 0;
+}
+
+
+/**
+ * next_etch_connection_id()
+ * return a unique ID used to identify a connection instance
+ */
+unsigned next_etch_connection_id()
+{
+ do { apr_atomic_inc32 ((volatile apr_uint32_t*) &connection_id_farm);
+ } while(connection_id_farm == 0);
+
+ return connection_id_farm;
+}
+
+
+/**
+ * etch_defconx_on_data()
+ * default receive data handler
+ */
+int etch_defconx_on_data (void* conx, const int unused, int length, char* data)
+{
+ char* msgmask = "connxn %d no data handler provided for %d bytes\n";
+ etch_connection* cx = (etch_connection*) conx;
+ ETCH_ASSERT(is_etch_connection(cx));
+ etchlog(ETCHCONX, ETCHLOG_WARNING, msgmask, cx->conxid, length);
+ return 0;
+}
+
+
+/*
+ * etch_defconx_on_event()
+ * default handler for connection events
+ */
+int etch_defconx_on_event (void* c, const int e, int p1, void* p2)
+{
+ return 0;
+}
+
+
+/*
+ * etchconx_wait_for()
+ * block until condition is met or timeout
+ */
+int etchconx_wait_for (etch_connection* cx, const int64 waitval, const int timeoutms)
+{
+ etchwait* waiter = cx->waiter;
+ int64* pcondvar = &cx->waitcond;
+ int result = 0;
+
+ result = waiter->timed_waitequal (waiter, pcondvar, waitval, timeoutms);
+ return result;
+}
+
+
+/*
+ * etchconx_init_waitstate()
+ * pre-set the condition variable target for wait state.
+ * this sets wait state to "waiting", and must be invoked prior to waiting up or down,
+ * if the up or down might complete before the user calls wait up or down. for example,
+ * if we want to establish a connection, then wait for it to come up, the connection
+ * may likely complete on another thread prior to the user calling wait up. we do this
+ * pre-setting of the condition variable to provide a target for the connection to
+ * mark the state.
+ */
+void etchconx_init_waitstate (etch_connection* cx)
+{
+ ETCH_ASSERT(cx && cx->waiter);
+ cx->waiter->cond_var = &cx->waitcond;
+}
+
+
+/*
+ * etchconx_wait_up()
+ * block until connection comes up or timeout
+ */
+int etchconx_wait_up (etch_connection* cx, int timeoutms)
+{
+ cx->waiter->cond_waitfor = 0; /* ETCH_CONXEVT_UP; semikludge */
+ return etchconx_wait_for(cx, ETCH_CONXEVT_UP, timeoutms);
+}
+
+
+/*
+ * etchconx_wait_down()
+ * block until connection is closed or timeout
+ */
+int etchconx_wait_down (etch_connection* cx, int timeoutms)
+{
+ cx->waiter->cond_waitfor = 0; /* ETCH_CONXEVT_DOWN; semikludge */
+ return etchconx_wait_for(cx, ETCH_CONXEVT_DOWN, timeoutms);
+}
+
+
+/*
+ * new_etch_socket()
+ * socket object wrapper constructor
+ */
+etch_socket* new_etch_socket(etch_rawsocket* rawsocket)
+{
+ etch_socket* newobj = (etch_socket*) new_object (sizeof(etch_socket),
+ ETCHTYPEB_SOCKET, CLASSID_ETCHSOCKET);
+ /* fyi this object uses the generic destructor since the rawsocket
+ * is allocated from the apr pool and not explicitly freed by etch */
+ newobj->value = rawsocket;
+ return newobj;
+}
+
+
+/*
+ * etch_destroy_connection()
+ * free memory owned by connection.
+ */
+int etch_destroy_connection(etch_connection* cx)
+{
+ if (cx)
+ { destroy_etchwait (cx->waiter);
+ destroy_etchmutex(cx->mutex);
+ etch_free(cx->hostname);
+ if (cx->is_ownpool) /* free apr subpool */
+ apr_pool_destroy(cx->aprpool);
+ }
+ return 0;
+}
+
+
+/*
+ * etch_init_connection()
+ * initialize an etch_connection (abstract base)
+ */
+int etch_init_connection (etch_connection* cx, etch_rawsocket* socket, void* owner)
+{
+ memset(cx, 0, sizeof(etch_connection));
+
+ cx->conxid = next_etch_connection_id();
+ cx->owner = owner;
+
+ /* set memory pool used for APR objects associated with this connection.
+ * if the connection is being created with an existing socket, use the
+ * memory pool already assigned to it, otherwise partition a new subpool.
+ * if connection allocates a subpool, connection will free it on destroy.
+ */
+ if (socket)
+ cx->aprpool = apr_socket_pool_get(socket);
+
+ if (cx->aprpool == NULL)
+ { apr_pool_create(&cx->aprpool, etch_apr_mempool);
+ cx->is_ownpool = TRUE;
+ }
+
+ if (cx->aprpool == NULL)
+ { cx->aprpool = etch_apr_mempool;
+ cx->is_ownpool = FALSE;
+ }
+
+ cx->sig = ETCH_CONX_SIG;
+ cx->mutex = new_mutex(cx->aprpool, TRUE);
+ cx->waiter = new_wait (cx->aprpool);
+
+ cx->bufsize = ETCH_CONX_DEFAULT_BUFSIZE;
+ cx->on_event = etch_defconx_on_event;
+ cx->on_data = etch_defconx_on_data;
+ cx->set_socket_options = etchconx_set_socket_options;
+ return 0;
+}
+
+
Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_conxevent.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,334 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_conxevent.c
+ * event handler overrides for the various connection superclasses
+ */
+
+#include "apr_network_io.h"
+#include "etchthread.h"
+#include "etch_tcpconxn.h"
+#include "etch_tcpserver.h"
+#include "etchlog.h"
+#include "etch_global.h"
+
+
+/*
+ * etch_deftcplistener_on_event()
+ * default handler for listener events
+ */
+int etch_deftcplistener_on_event(etch_tcp_server* l, etch_tcp_connection* c, const int e, int p1, void* p2)
+{
+ static char cxstr[24], *smask = "server %u";
+
+ if (c)
+ { c->cx.listener = (etch_object*) l;
+ return c->cx.on_event(c, e, p1, p2);
+ }
+
+ sprintf(cxstr, smask, l->listener_id);
+
+ switch(e)
+ {
+ case ETCH_CONXEVT_CREATED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_CREATERR:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not created\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_OPENING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opening ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_DESTROYING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_DESTROYED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroyed\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_SHUTDOWN:
+ etchlog(ETCHCONX, ETCHLOG_WARNING, "%s shutdown request detected\n", cxstr);
+ break;
+ }
+
+ return 0;
+}
+
+
+/*
+ * etch_tcpconx_on_event()
+ * default handler for connection events
+ */
+int etch_tcpconx_on_event(etch_tcp_connection* c, const int e, int p1, void* p2)
+{
+ int result = 0, lid = 0;
+ static char cxstr[32], estr[128];
+ static char *scmask = "server %u connxn %u", *cmask = "connxn %u";
+
+ if (is_etch_tcpserver(c->cx.listener))
+ lid = ((etch_tcp_server*)(c->cx.listener))->listener_id;
+ if (lid)
+ sprintf(cxstr, scmask, lid, c->cx.conxid);
+ else sprintf(cxstr, cmask, c->cx.conxid);
+
+ switch(e)
+ {
+ case ETCH_CONXEVT_RECEIVING:
+ etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s begin receive (block) ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_RECEIVERR:
+ apr_strerror(p1, estr, 128);
+ if (IS_ETCH_SOCKET_TIMEOUT(p1))
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s socket receive timed out\n", cxstr);
+ else
+ if (p1 == APR_OTHER_END_CLOSED)
+ etchlog(ETCHCONX, ETCHLOG_WARNING,"%s connection was broken\n", cxstr);
+ else etchlog(ETCHCONX, ETCHLOG_ERROR, "%s apr_socket_recv() %s\n", cxstr, estr);
+ break;
+
+ case ETCH_CONXEVT_RECEIVED: /* "eod=%d", p1 */
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s end receive %d bytes\n", cxstr, (int)(size_t)p2);
+ break;
+
+ case ETCH_CONXEVT_CONXCLOSED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s local connection closed\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_PEERCLOSED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s peer connection closed\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_RECEIVEND:
+ etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s exit receive loop\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_SENDING:
+ etchlog(ETCHCONX, ETCHLOG_XDEBUG, "%s begin send\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_SENDERR:
+ apr_strerror(p1, estr, 128);
+ if (IS_ETCH_SOCKET_TIMEOUT(p1))
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s socket send timed out\n", cxstr);
+ else etchlog(ETCHCONX, ETCHLOG_ERROR, "%s apr_socket_send() %s\n", cxstr, estr);
+ break;
+
+ case ETCH_CONXEVT_SENDEND:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s end send %d bytes\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_RCVPUMP_START:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s start receive pump\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_RCVPUMP_RECEIVING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s thread %d receiving ...\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_RCVPUMP_ERR:
+ result = -1;
+ if (p1 == APR_OTHER_END_CLOSED)
+ etchlog(ETCHCONX, ETCHLOG_WARNING, "%s connection was broken\n", cxstr);
+ else etchlog(ETCHCONX, ETCHLOG_ERROR, "%s receive failed\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_RCVPUMP_STOP:
+ result = p1;
+ if (result >= 0)
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s receive pump on thread %d exited\n",
+ cxstr, (int) (size_t) p2);
+ else
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s receive pump abnormal exit\n", cxstr);
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying accepted connection\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_ACCEPTING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accepting ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_ACCEPTERR:
+ result = -1;
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s apr_socket_accept() %s\n", cxstr, estr);
+ break;
+
+ case ETCH_CONXEVT_ACCEPTED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accepted\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_CREATED:
+ if (p1)
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created for socket %x\n", cxstr, p1);
+ else
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s created\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_CREATERR:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not created\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_OPENING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opening ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_OPENERR:
+ result = -1;
+
+ switch(p1)
+ { case 0:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not opened\n", cxstr);
+ break;
+ case 2:
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_WARNING,
+ "%s socket connect: %s\n", cxstr, estr);
+ break;
+ case 3:
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s socket create: %s\n", cxstr, estr);
+ break;
+ case 4:
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s sockaddr info: %s\n", cxstr, estr);
+ break;
+ case 5:
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s socket bind: %s\n", cxstr, estr);
+ break;
+ case 6:
+ apr_strerror((apr_status_t)(size_t)p2, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s socket listen: %s\n", cxstr, estr);
+ break;
+ }
+ break;
+
+ case ETCH_CONXEVT_OPENED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s opened\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_STARTING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s starting ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_STARTERR:
+ result = -1;
+ switch(p1)
+ { case 0:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not started\n", cxstr);
+ break;
+ case 1:
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s etch_threadpool.run()\n", cxstr);
+ break;
+ }
+ break;
+
+ case ETCH_CONXEVT_STARTED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s started\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_ACCEPTPUMPEXIT:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accept pump on thread %d exited\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_ACCEPTPUMPEXITERR:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s accept pump abnormal exit\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_LISTENED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s server id is %d\n", cxstr, p1);
+ break;
+
+ case ETCH_CONXEVT_UP:
+ case ETCH_CONXEVT_DOWN:
+ break;
+
+ case ETCH_CONXEVT_SOCKOPTERR:
+ result = 1; /* to increment counter */
+ apr_strerror(p1, estr, 128);
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s set socket option %s: %s\n", cxstr, p2, estr);
+ break;
+
+ case ETCH_CONXEVT_SHUTDOWN:
+ etchlog(ETCHCONX, ETCHLOG_WARNING, "%s shutdown request detected\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_STOPPING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s stopping ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_STOPERR:
+ result = -1;
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s not stopped\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_STOPPED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s stopped\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_CLOSING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s closing ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_CLOSERR:
+ result = -1;
+
+ switch(p1)
+ { case 0:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s not closed\n", cxstr);
+ break;
+ case 1:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s close when not open\n", cxstr);
+ break;
+ case 2:
+ etchlog(ETCHCONX, ETCHLOG_ERROR, "%s concurrent close denied\n", cxstr);
+ break;
+ case 3:
+ etchlog(ETCHCONX, ETCHLOG_ERROR,
+ "%s apr_socket_close() error %d\n", cxstr, (int)(size_t)p2);
+ break;
+ }
+ break;
+
+ case ETCH_CONXEVT_CLOSED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s closed\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_DESTROYING:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroying ...\n", cxstr);
+ break;
+
+ case ETCH_CONXEVT_DESTROYED:
+ etchlog(ETCHCONX, ETCHLOG_DEBUG, "%s destroyed\n", cxstr);
+ break;
+ }
+
+ return result;
+}
+
Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_mailboxmgr.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,120 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_mailboxmgr.c
+ * i_mailbox manager interface
+ */
+
+#include "etch_mailboxmgr.h"
+#include "etch_message.h"
+#include "etch_global.h"
+#include "etchlog.h"
+
+int destroy_mailbox_manager_interface(i_mailbox_manager*);
+int etchmailboxmgr_def_headersize(i_mailbox_manager*);
+int etchmailboxmgr_def_transport_call(i_mailbox_manager*, etch_who*, etch_message*, void**);
+int etchmailboxmgr_def_redeliver (i_mailbox_manager*, etch_who*, etch_message*);
+int etchmailboxmgr_def_unregister(i_mailbox_manager*, i_mailbox*);
+
+
+/**
+ * new_mailboxmgr_interface()
+ * i_mailbox_manager constructor.
+ * @param thisx the mailbox manager object.
+ * @param itm transportmesssage interface, caller retains, can be null.
+ * @param ism sessionmessage interface, caller retains, can be null.
+ */
+i_mailbox_manager* new_mailboxmgr_interface(void* thisx,
+ i_transportmessage* itm, i_sessionmessage* ism)
+{
+ i_mailbox_manager* newi = (i_mailbox_manager*) new_object
+ (sizeof(i_mailbox_manager), ETCHTYPEB_MBOX_MANAGER, CLASSID_MBOX_MANAGER);
+
+ newi->thisx = thisx;
+ newi->clone = clone_null;
+ newi->destroy = destroy_mailbox_manager_interface;
+
+ newi->transport_call = etchmailboxmgr_def_transport_call;
+ newi->redeliver = etchmailboxmgr_def_redeliver;
+ newi->unregister = etchmailboxmgr_def_unregister;
+
+ newi->itm = itm? itm: new_transportmsg_interface(newi, NULL, NULL);
+ newi->transport_message = newi->itm->transport_message;
+ newi->transport_message = newi->itm->transport_message;
+ newi->transport_control = newi->itm->transport_control;
+ newi->transport_notify = newi->itm->transport_notify;
+ newi->transport_query = newi->itm->transport_query;
+ newi->get_session = newi->itm->get_session;
+ newi->set_session = newi->itm->set_session;
+
+ return newi;
+}
+
+
+/**
+ * destroy_mailbox_manager_interface()
+ * i_mailbox_manager destructor
+ */
+int destroy_mailbox_manager_interface(i_mailbox_manager* mgr)
+{
+ if (NULL == mgr) return -1;
+ if (mgr->refcount > 0 && --mgr->refcount > 0) return -1;
+
+ if (!is_etchobj_static_content(mgr))
+ {
+ if (mgr->ism)
+ mgr->ism->destroy(mgr->ism);
+
+ if (mgr->itm)
+ mgr->itm->destroy(mgr->itm);
+ }
+
+ return destroy_objectex((objmask*)mgr);
+}
+
+
+/**
+ * etchmailboxmgr_def_transport_call()
+ * default implementation of i_mailbox_manager::transport_call()
+ */
+int etchmailboxmgr_def_transport_call(i_mailbox_manager* imbm, etch_who* whoto, etch_message* msg, i_mailbox** out)
+{
+ return -1;
+}
+
+
+/**
+ * etchmailboxmgr_def_redeliver()
+ * default implementation of i_mailbox_manager::redeliver()
+ */
+int etchmailboxmgr_def_redeliver(i_mailbox_manager* imbm, etch_who* whofrom, etch_message* msg)
+{
+ return -1;
+}
+
+
+/**
+ * etchmailboxmgr_def_unregister()
+ * default implementation of i_mailbox_manager::unregister()
+ */
+int etchmailboxmgr_def_unregister(i_mailbox_manager* imbm, i_mailbox* mbox)
+{
+ return -1;
+}
+
Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_messagizer.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,450 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_msgizer.c
+ * messagizer accepts packets and translates them to messages,
+ * and it accepts messages and translates them to packets.
+ */
+
+#include "etch_messagizer.h"
+#include "etch_tdformat.h"
+#include "etchthread.h"
+#include "etchlog.h"
+wchar_t* ETCHWMSZ = L"MSGZ";
+char* ETCHMSGZ = "MSGZ";
+
+int destroy_messagizer(etch_messagizer*);
+int etch_msgizer_transport_message(etch_messagizer*, etch_who*, etch_message*);
+int etch_msgizer_session_packet (etch_messagizer*, etch_who*, etch_flexbuffer*);
+int etch_msgizer_session_control (etch_messagizer*, etch_event*, objmask*);
+int etch_msgizer_session_notify (etch_messagizer*, etch_event*);
+objmask* etch_msgizer_session_query (etch_messagizer*, objmask*);
+int etch_msgizer_transport_control(etch_messagizer*, etch_event*, objmask*);
+int etch_msgizer_transport_notify (etch_messagizer*, etch_event*);
+objmask* etch_msgizer_transport_query (etch_messagizer*, objmask*);
+
+
+/* - - - - - - - - - - - - - - -
+ * etch_messagizer
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * new_messagizer()
+ * etch_messagizer public constructor
+ * @param ixport transport interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resx a resources map owned by caller.
+ */
+
+etch_messagizer* new_messagizer (i_transportpacket* ixp, wchar_t* uri, etch_resources* resx)
+{
+ etch_url* url = new_url(uri);
+
+ etch_messagizer* messagizer = new_messagizer_a(ixp, url, resx);
+
+ url->destroy(url);
+ return messagizer;
+}
+
+
+/**
+ * new_messagizer_a()
+ * etch_messagizer private constructor
+ * @param ipacket transport interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resxmap a resources map owned by caller.
+ */
+etch_messagizer* new_messagizer_a (i_transportpacket* ixp, etch_url* url, etch_resources* resxmap)
+{
+ tagdata_format_factory* formatfactory = NULL;
+ etch_messagizer* messagizer = NULL;
+ i_transport* itransport = NULL;
+ etch_value_factory* vf = NULL;
+ etch_string* formatobj = NULL;
+ i_session* isession = NULL;
+ etchmutex* mutex = NULL;
+ int result = -1;
+
+ do
+ { formatobj = (etch_string*) etchurl_get_term (url, ETCH_RESXKEY_MSGIZER_FORMAT);
+
+ if (NULL == formatobj && NULL != resxmap)
+ formatobj = (etch_string*) etch_resources_get (resxmap, ETCH_RESXKEY_MSGIZER_FORMAT);
+
+ if (NULL == formatobj)
+ { etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"URI missing format specifier '%s'\n",
+ ETCH_RESXKEY_MSGIZER_FORMAT);
+ break;
+ }
+
+ formatfactory = get_format_factory(formatobj->v.valw);
+ if (NULL == formatfactory)
+ { etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"no format '%s' is installed\n",
+ formatobj->v.valw);
+ break;
+ }
+
+ vf = resxmap? (etch_value_factory*) etch_resources_get(resxmap,
+ ETCH_RESXKEY_MSGIZER_VALUFACT): NULL;
+ if (NULL == vf)
+ { etchlogw(ETCHWMSZ, ETCHLOG_ERROR, L"no value factory '%s' is installed\n",
+ ETCH_RESXKEY_MSGIZER_VALUFACT);
+ break;
+ }
+
+ if (NULL == (mutex = new_mutex(etch_apr_mempool, ETCHMUTEX_NESTED))) break;
+
+ /* - - - - - - - - - - - - - - -
+ * etch_messagizer
+ * - - - - - - - - - - - - - - -
+ */
+ messagizer = (etch_messagizer*) new_object
+ (sizeof(etch_messagizer), ETCHTYPEB_MESSAGIZER, CLASSID_MESSAGIZER);
+
+ messagizer->destroy = destroy_messagizer;
+ messagizer->clone = clone_null;
+ messagizer->msglock = mutex;
+ messagizer->msgbuf = new_flexbuffer(ETCH_DEFSIZE); /* 2K default */
+
+ /* set our transport to that of next lower layer (packetizer) */
+ messagizer->transport = ixp; /* i_transportpacket owned by caller */
+
+ /* - - - - - - - - - - - - - - -
+ * i_transportmessage
+ * - - - - - - - - - - - - - - -
+ */
+ itransport = new_transport_interface_ex (messagizer,
+ (etch_transport_control) etch_msgizer_transport_control,
+ (etch_transport_notify) etch_msgizer_transport_notify,
+ (etch_transport_query) etch_msgizer_transport_query,
+ etch_msgizer_get_session,
+ etch_msgizer_set_session);
+
+ /* instantiate i_transportmessage interface which messagizer implements */
+ messagizer->transportmsg = new_transportmsg_interface(messagizer,
+ etch_msgizer_transport_message,
+ itransport); /* transportmsg now owns itransport */
+
+ /* copy i_transportmessage interface methods up to messagizer */
+ messagizer->transport_message = etch_msgizer_transport_message;
+ messagizer->transport_control = itransport->transport_control;
+ messagizer->transport_notify = itransport->transport_notify;
+ messagizer->transport_query = itransport->transport_query;
+ messagizer->get_session = itransport->get_session;
+ messagizer->set_session = itransport->set_session;
+
+ /* - - - - - - - - - - - - - - -
+ * i_sessionpacket
+ * - - - - - - - - - - - - - - -
+ */
+ isession = new_session_interface(messagizer,
+ (etch_session_control) etch_msgizer_session_control,
+ (etch_session_notify) etch_msgizer_session_notify,
+ (etch_session_query) etch_msgizer_session_query);
+
+ /* instantiate i_sessionpacket interface which messagizer implements */
+ messagizer->sessionpkt = new_sessionpkt_interface(messagizer,
+ etch_msgizer_session_packet,
+ isession); /* transportmsg now owns isession */
+
+ /* copy session interface to parent */
+ messagizer->session_packet = etch_msgizer_session_packet;
+ messagizer->session_control = isession->session_control;
+ messagizer->session_notify = isession->session_notify;
+ messagizer->session_query = isession->session_query;
+
+ /* finally set session of next lower layer to our session */
+ /* fyi we must pass the implementor of transport as thisx, i.e. packetizer */
+ messagizer->transport->set_session (messagizer->transport->thisx, messagizer->sessionpkt);
+
+ /* - - - - - - - - - - - - - - -
+ * tagged data in out
+ * - - - - - - - - - - - - - - -
+ */
+ messagizer->tdi = formatfactory->new_tagdata_input(vf);
+ if (NULL == messagizer->tdi) break;
+
+ messagizer->tdo = formatfactory->new_tagdata_output(vf);
+ if (NULL == messagizer->tdo) break;
+
+ result = 0;
+
+ } while(0);
+
+
+ if (-1 == result)
+ {
+ if (vf)
+ vf->destroy(vf);
+
+ if (messagizer)
+ messagizer->destroy(messagizer);
+
+ messagizer = NULL;
+ }
+
+ if (formatfactory)
+ formatfactory->destroy(formatfactory);
+
+ return messagizer;
+}
+
+
+/**
+ * destroy_messagizer()
+ * destructor for etch_messagizer
+ */
+int destroy_messagizer (etch_messagizer* thisx)
+{
+ if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;
+
+ if (!is_etchobj_static_content(thisx))
+ {
+ if (thisx->transportmsg)
+ thisx->transportmsg->destroy(thisx->transportmsg);
+
+ if (thisx->sessionpkt)
+ thisx->sessionpkt->destroy(thisx->sessionpkt);
+
+ if (thisx->tdi)
+ thisx->tdi->destroy(thisx->tdi);
+
+ if (thisx->tdo)
+ thisx->tdo->destroy(thisx->tdo);
+
+ if (thisx->msgbuf)
+ thisx->msgbuf->destroy(thisx->msgbuf);
+
+ if (thisx->msglock)
+ thisx->msglock->destroy(thisx->msglock);
+ }
+
+ return destroy_objectex((objmask*) thisx);
+}
+
+
+/**
+ * etch_msgizer_get_transport()
+ * @return a reference to the messagizer transport interface.
+ * this object is owned by whatever object created the messagizer.
+ */
+i_transportpacket* etch_msgizer_get_transport (etch_messagizer* thisx)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ return thisx->transport;
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_transportmessage
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_msgizer_transport_message()
+ * i_transportmessage::transport_message override.
+ * serializes message and delivers its data to transport
+ * @param whoto recipient - caller retains this memory, can be null.
+ * @param message the message
+ * caller relinquishes this memory on success, retains on failure.
+ * @return 0 success, -1 error.
+ */
+int etch_msgizer_transport_message (etch_messagizer* thisx, etch_who* whoto,
+ etch_message* msg)
+{
+ int result = 0;
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ thisx->msglock->acquire(thisx->msglock);
+
+ do
+ { const int headersize = thisx->transport->get_headersize (thisx->transport);
+
+ etch_flexbuf_skip (thisx->msgbuf, headersize, TRUE);
+
+ /* serialize the message to the buffer */
+ result = thisx->tdo->vtab->write_message (thisx->tdo, msg, thisx->msgbuf);
+
+ if (-1 == result) break;
+
+ etch_flexbuf_set_index (thisx->msgbuf, 0);
+
+ /* deliver packet buffer to transport */ /* msgizer->itp->pktizer */
+ result = thisx->transport->transport_packet (thisx->transport->thisx,
+ whoto, thisx->msgbuf);
+
+ if (0 == result)
+ msg->destroy(msg);
+
+ } while(0);
+
+ thisx->msglock->release(thisx->msglock);
+ return result;
+}
+
+
+/**
+ * etch_msgizer_get_session()
+ * @return a reference to the messagizer i_sessionmessage interface.
+ * this object is owned by whatever object set the messagizer session.
+ */
+i_sessionmessage* etch_msgizer_get_session (etch_messagizer* thisx)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ return thisx->session;
+}
+
+
+/**
+ * etch_msgizer_set_session()
+ * @param session an i_sessionmessage reference. caller owns this object.
+ */
+void etch_msgizer_set_session (etch_messagizer* thisx, i_sessionmessage* session)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ ETCH_ASSERT(is_etch_sessionmsg(session));
+ thisx->session = session;
+}
+
+
+/**
+ * etch_msgizer_transport_control()
+ * i_transportmessage::transport_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_msgizer_transport_control (etch_messagizer* thisx, etch_event* control, objmask* value)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ /* mzr itp mzr itp packetizer */
+ return thisx->transport->transport_control (thisx->transport->thisx, control, value);
+}
+
+
+/**
+ * etch_msgizer_transport_notify()
+ * i_transportmessage::transport_notify override.
+ * @param evt, caller relinquishes.
+ */
+int etch_msgizer_transport_notify (etch_messagizer* thisx, etch_event* evt)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ return thisx->transport->transport_notify (thisx->transport->thisx, evt);
+}
+
+
+/**
+ * etch_msgizer_transport_query()
+ * i_transportmessage::transport_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_msgizer_transport_query (etch_messagizer* thisx, objmask* query)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+ return thisx->transport->transport_query (thisx->transport->thisx, query);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_sessionpacket
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_msgizer_session_packet()
+ * i_sessionpacket::session_packet override.
+ * delivers data to the session from the transport.
+ * @param from from who sent the packet.
+ * caller retains memory for this object, can be null.
+ * @param buffer the packet from the packet source.
+ * caller retains memory for this object.
+ * @return 0 success, -1 error (exception condition)
+ */
+int etch_msgizer_session_packet (etch_messagizer* thisx, etch_who* whofrom, etch_flexbuffer* fbuf)
+{
+ etch_message* msg = NULL;
+ int is_message_handled = FALSE;
+ ETCH_ASSERT(is_etch_messagizer(thisx));
+
+ /* create an etch message from the packetized data */
+ msg = thisx->tdi->vtab->read_message (thisx->tdi, fbuf);
+
+ if (NULL == msg) return -1;
+
+ /* send the new message up via session.
+ * memory management rules are: if session_message() handles the message,
+ * it owns msg memory. otherwise (if not handled), msg memory is owned by
+ * the unwanted_message wrapper created here, which itself is owned by the
+ * session_notify() destination.
+ */
+ is_message_handled /* call mailbox mgr to queue the new message up to a mailbox */
+ = (0 == thisx->session->session_message (thisx->session->thisx, whofrom, msg));
+
+ /* if the message was not handled, e.g. the message is an exception returned
+ * from a one-way message and there was therefore no mailbox to receive it,
+ * forward the message to the session via mailbox manager.
+ */
+ if (!is_message_handled)
+ { etchlog (ETCHMSGZ, ETCHLOG_DEBUG, "unable to post message to a mailbox\n"),
+ etchlog (ETCHMSGZ, ETCHLOG_DEBUG, "deferring '%s' to session\n", message_aname(msg));
+ thisx->session->session_notify (thisx->session->thisx, new_unwanted_message(whofrom, msg));
+ }
+
+ /* regardless we have relinquished msg at this point. it is now either queued up
+ * in a mailbox, or forwarded in the unwanted message wrapper above. */
+ return 0;
+}
+
+
+/**
+ * etch_msgizer_session_control()
+ * i_sessionpacket::session_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_msgizer_session_control (etch_messagizer* thisx, etch_event* control, objmask* value)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
+ return thisx->session->session_control (thisx->session->thisx, control, value);
+}
+
+
+/**
+ * etch_msgizer_session_notify()
+ * i_sessionpacket::session_notify override.
+ * @param event, caller relinquishes.
+ */
+int etch_msgizer_session_notify (etch_messagizer* thisx, etch_event* evt)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
+ return thisx->session->session_notify (thisx->session->thisx, evt);
+}
+
+
+/**
+ * etch_msgizer_session_query()
+ * i_sessionpacket::session_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_msgizer_session_query (etch_messagizer* thisx, objmask* query)
+{
+ ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
+ return thisx->session->session_query (thisx->session->thisx, query);
+}
+
Added: incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c?rev=767594&view=auto
==============================================================================
--- incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c (added)
+++ incubator/etch/trunk/binding-c/runtime/c/src/transport/etch_packetizer.c Wed Apr 22 17:25:43 2009
@@ -0,0 +1,506 @@
+/* $Id$
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * etch_packetizer.c
+ * packetizes a stream data source. Reads and verfifies a packet header consisting
+ * of a 32-bit signature and 32-bit length. verifies the flag, using length extracted
+ * from header; reads packet data and forwards it to the packet handler. as a packet
+ * source, accepts a packet and prepends a packet header prior to delivering it to a
+ * data source.
+ */
+
+#include "apr_network_io.h"
+#include "etch_packetizer.h"
+#include "etch_global.h"
+#include "etch_url.h"
+#include "etchlog.h"
+
+wchar_t* ETCHPKZRW = L"PKZR";
+char* ETCHPKZR = "PKZR";
+
+int destroy_packetizer(etch_packetizer*);
+etch_packetizer* new_packetizer_b (i_transportdata*, const int maxpktsize);
+int etch_pktizer_transport_packet (etch_packetizer*, etch_who*, etch_flexbuffer*);
+int etch_pktizer_session_data (etch_packetizer*, etch_who*, etch_flexbuffer*);
+int etch_pktizer_session_control (etch_packetizer*, etch_event*, objmask*);
+int etch_pktizer_session_notify (etch_packetizer*, etch_event*);
+objmask* etch_pktizer_session_query (etch_packetizer*, objmask*);
+int etch_pktizer_transport_control(etch_packetizer*, etch_event*, objmask*);
+int etch_pktizer_transport_notify (etch_packetizer*, etch_event*);
+objmask* etch_pktizer_transport_query (etch_packetizer*, objmask*);
+
+const wchar_t* ETCH_PKTIZER_MAX_PKT_SIZE_TERM = L"Packetizer.maxPktSize";
+const int ETCH_PKTIZER_DEFMAXPKTSIZE = 10240;
+const int ETCH_PKTIZER_HEADERSIZE = 8;
+const int ETCH_PKTIZER_SIG = 0xdeadbeef;
+
+
+/* - - - - - - - - - - - - - - - - - - - - - -
+ * packetizer constructor/destructor
+ * - - - - - - - - - - - - - - - - - - - - - -
+ */
+
+/**
+ * new_packetizer()
+ * * etch_packetizer public constructor
+ * @param itd i_transportdata interface owned by caller
+ * @param uri a URI string owned by caller.
+ * @param resources a resources map owned by caller.
+ */
+etch_packetizer* new_packetizer (i_transportdata* itd, wchar_t* uri, etch_resources* resxmap)
+{
+ etch_url* url = new_url(uri);
+
+ etch_packetizer* packetizer = new_packetizer_a (itd, url, resxmap);
+
+ url->destroy(url);
+ return packetizer;
+}
+
+
+/**
+ * new_packetizer_a()
+ * etch_packetizer constructor 2
+ */
+etch_packetizer* new_packetizer_a (i_transportdata* itd, etch_url* url, etch_resources* resxmap)
+{
+ int maxpktsize = 0, result = 0;
+ etch_packetizer* packetizer = NULL;
+
+ result = etchurl_get_integer_term (url, ETCH_PKTIZER_MAX_PKT_SIZE_TERM, &maxpktsize);
+ if (-1 == result || maxpktsize <= 0) maxpktsize = ETCH_PKTIZER_DEFMAXPKTSIZE;
+
+ return new_packetizer_b (itd, maxpktsize);
+}
+
+
+/**
+ * new_packetizer_b()
+ * etch_packetizer private constructor
+ * @param transport the transport interface of the next lower layer of the stack,
+ * that being the connection, e.g. etch_tcp_connection. not owned.
+ * @param maxpktsize maximum number of bytes in a packet (default currently 10240)
+ */
+etch_packetizer* new_packetizer_b (i_transportdata* itd, const int maxpktsize)
+{
+ etch_packetizer* packetizer = NULL;
+ i_transport* itransport = NULL;
+ i_session* isession = NULL;
+ etchmutex* mutex = NULL;
+ int result = -1;
+ ETCH_ASSERT(is_etch_transportdata(itd));
+
+ do
+ {
+ #if(ETCHPZR_HAS_MUTEX)
+ if (NULL == (mutex = new_mutex(etch_apr_mempool, ETCHMUTEX_NESTED))) break;
+ #endif
+
+ /* - - - - - - - - - - - - - - -
+ * etch_packetizer
+ * - - - - - - - - - - - - - - -
+ */
+ packetizer = (etch_packetizer*) new_object
+ (sizeof(etch_packetizer), ETCHTYPEB_PACKETIZER, CLASSID_PACKETIZER);
+
+ packetizer->destroy = destroy_packetizer;
+ packetizer->clone = clone_null;
+ packetizer->datalock = mutex;
+
+ packetizer->headersize = ETCH_PKTIZER_HEADERSIZE;
+ packetizer->is_wantheader = TRUE;
+ packetizer->maxpacketsize = maxpktsize > 0? maxpktsize: ETCH_PKTIZER_DEFMAXPKTSIZE;
+
+ packetizer->savebuf = new_flexbuffer(ETCH_DEFSIZE); /* 2K default */
+
+ /* set our transport to that of next lower layer (connection) */
+ packetizer->transport = itd; /* not owned */
+
+
+ /* - - - - - - - - - - - - - - -
+ * i_transportpacket
+ * - - - - - - - - - - - - - - -
+ */
+ itransport = new_transport_interface_ex (packetizer,
+ (etch_transport_control) etch_pktizer_transport_control,
+ (etch_transport_notify) etch_pktizer_transport_notify,
+ (etch_transport_query) etch_pktizer_transport_query,
+ etch_pktizer_get_session,
+ etch_pktizer_set_session);
+
+ /* instantiate i_transportpacket interface which packetizer implements */
+ packetizer->transportpkt = new_transportpkt_interface (packetizer,
+ etch_pktizer_transport_packet,
+ itransport); /* transportpkt now owns itransport */
+
+ /* copy i_transportpacket interface methods up to packetizer */
+ packetizer->transport_packet = etch_pktizer_transport_packet;
+ packetizer->transport_control = itransport->transport_control;
+ packetizer->transport_notify = itransport->transport_notify;
+ packetizer->transport_query = itransport->transport_query;
+ packetizer->get_session = itransport->get_session;
+ packetizer->set_session = itransport->set_session;
+
+
+ /* - - - - - - - - - - - - - - -
+ * i_sessiondata
+ * - - - - - - - - - - - - - - -
+ */
+ isession = new_session_interface (packetizer,
+ (etch_session_control) etch_pktizer_session_control,
+ (etch_session_notify) etch_pktizer_session_notify,
+ (etch_session_query) etch_pktizer_session_query);
+
+ /* instantiate i_sessiondata interface which packetizer implements */
+ packetizer->sessiondata = new_sessiondata_interface(packetizer,
+ etch_pktizer_session_data,
+ isession); /* sessiondata now owns isession */
+
+ /* copy session interface to parent */
+ packetizer->session_data = etch_pktizer_session_data;
+ packetizer->session_control = isession->session_control;
+ packetizer->session_notify = isession->session_notify;
+ packetizer->session_query = isession->session_query;
+
+ /* finally set session of next lower layer (tcp connection) to our session */
+ /* fyi we must pass the implementor of transport as thisx, i.e. tcpconnection */
+ packetizer->transport->set_session (packetizer->transport->thisx, packetizer->sessiondata);
+
+ result = 0;
+
+ } while(0);
+
+
+ if (-1 == result)
+ {
+ if (packetizer)
+ packetizer->destroy(packetizer);
+
+ packetizer = NULL;
+ }
+
+ return packetizer;
+}
+
+
+/*
+ * destroy_packetizer()
+ * etch_packetizer destructor
+ */
+int destroy_packetizer(etch_packetizer* thisx)
+{
+ if (NULL == thisx) return -1;
+ if (thisx->refcount > 0 && --thisx->refcount > 0) return -1;
+
+ if (!is_etchobj_static_content(thisx))
+ {
+ if (thisx->sessiondata)
+ thisx->sessiondata->destroy(thisx->sessiondata);
+
+ if (thisx->transportpkt)
+ thisx->transportpkt->destroy(thisx->transportpkt);
+
+ if (thisx->datalock)
+ thisx->datalock->destroy(thisx->datalock);
+
+ if (thisx->savebuf)
+ thisx->savebuf->destroy(thisx->savebuf);
+ }
+
+ return destroy_objectex((objmask*)thisx);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_transportpacket
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_pktizer_transport_packet()
+ * delivers packet to transport after adding packet header.
+ * @param whoto recipient - caller retains this memory.
+ * @param fbuf buffer positioned at the packet data, with space for header
+ * @return 0 success, -1 error
+ */
+int etch_pktizer_transport_packet(etch_packetizer* thisx, etch_who* whoto, etch_flexbuffer* fbuf)
+{
+ int result = -1;
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+
+ do
+ { size_t saveindex = 0;
+ const int datalen = (int) etch_flexbuf_avail(fbuf);
+
+ if (datalen < ETCH_PKTIZER_HEADERSIZE) break;
+
+ saveindex = fbuf->index;
+
+ etch_flexbuf_put_int(fbuf, ETCH_PKTIZER_SIG);
+ etch_flexbuf_put_int(fbuf, datalen - ETCH_PKTIZER_HEADERSIZE);
+
+ fbuf->index = saveindex;
+
+ /* deliver packet buffer to transport */
+ result = thisx->transport->transport_data(thisx->transport->thisx, whoto, fbuf);
+
+ } while(0);
+
+ return result;
+}
+
+
+/**
+ * etch_pktizer_get_session()
+ * @return a reference to the packetizer i_sessiondata interface.
+ * this object is owned by whatever object set the packetizer session.
+ */
+i_sessionpacket* etch_pktizer_get_session (etch_packetizer* thisx)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->session;
+}
+
+
+/**
+ * etch_pktizer_set_session()
+ * set packetizer session interface presumably to that of the next higher layer.
+ * @param session an i_sessionpacket reference. caller owns this object.
+ */
+void etch_pktizer_set_session (etch_packetizer* thisx, i_sessionpacket* newsession)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ ETCH_ASSERT(is_etch_sessionpacket(newsession));
+ thisx->session = newsession;
+}
+
+
+/**
+ * etch_pktizer_transport_control()
+ * i_transportpacket::transport_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_pktizer_transport_control (etch_packetizer* thisx, etch_event* control, objmask* value)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ /* pzr itd pzr itd tcpconx */
+ return thisx->transport->transport_control (thisx->transport->thisx, control, value);
+}
+
+
+/**
+ * etch_pktizer_transport_notify()
+ * i_transportpacket::transport_notify override.
+ * @param evt, caller relinquishes.
+ */
+int etch_pktizer_transport_notify (etch_packetizer* thisx, etch_event* evt)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->transport->transport_notify (thisx->transport->thisx, evt);
+}
+
+
+/**
+ * etch_pktizer_transport_query()
+ * i_transportpacket::transport_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_pktizer_transport_query (etch_packetizer* thisx, objmask* query)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->transport->transport_query (thisx->transport->thisx, query);
+}
+
+
+/* - - - - - - - - - - - - - - -
+ * i_sessiondata
+ * - - - - - - - - - - - - - - -
+ */
+
+/**
+ * etch_pktizer_session_data()
+ * i_sessiondata::session_data override.
+ * delivers data to the session from the transport
+ * @param whofrom from who sent the packet data
+ * caller retains this memory, can be null.
+ * @param fbuf the packet from the packet source positioned at the data, caller retains.
+ * @return 0 success, -1 error.
+ */
+int etch_pktizer_session_data (etch_packetizer* pzr, etch_who* whofrom, etch_flexbuffer* fbuf)
+{
+ /* two scenarios: the first is that we have no buffered data and the
+ * entire packet is contained within the buffer. in that case we can skip
+ * the details and ship the packet directly to the handler.
+ */
+ size_t curlen, curndx, bytes_avail, fbuf_avail, packet_bodylen, bytes_put;
+ int bytes_needed;
+ etch_flexbuffer* savebuf = NULL;
+ ETCH_ASSERT(is_etch_packetizer(pzr));
+ savebuf = pzr->savebuf;
+
+ while(1) /* while(fbuf.avail() > 0 ... */
+ { if (0 >= (fbuf_avail = etch_flexbuf_avail(fbuf))) break;
+
+ bytes_avail = savebuf->datalen + fbuf_avail;
+
+ if (pzr->is_wantheader)
+ {
+ if (bytes_avail >= (unsigned) ETCH_PKTIZER_HEADERSIZE)
+ { /* there are enough bytes in the pipeline for a header */
+ if (savebuf->datalen == 0)
+ { /* save buffer is empty, entire header in fbuf */
+ packet_bodylen = etch_pktizer_process_header(pzr, fbuf, FALSE);
+ }
+ else /* save non empty, header split across save and buf, so ... */
+ { /* move enough bytes from buf to save to complete a header */
+ bytes_needed = ETCH_PKTIZER_HEADERSIZE - (int) savebuf->datalen;
+
+ bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
+ if (0 == bytes_put) return -1;
+ etch_flexbuf_set_index(savebuf, 0);
+ packet_bodylen = etch_pktizer_process_header(pzr, savebuf, TRUE);
+ }
+
+ if (packet_bodylen == -1) return -1; /* bad header */
+ if (packet_bodylen == 0) continue; /* header with empty body */
+ pzr->bodylength = packet_bodylen;
+ pzr->is_wantheader = FALSE;
+ }
+ else /* want header, but too few bytes in the pipeline */
+ { /* ... so save fbuf to the save buffer */
+ etch_flexbuf_set_index (savebuf, savebuf->datalen);
+ etch_flexbuf_put_from(savebuf, fbuf, -1);
+ }
+ }
+ else /* didn't need a header, so ... */
+ if (bytes_avail >= pzr->bodylength)
+ {
+ /* we need the body, and there are enough bytes in the pipeline.
+ * three scenarios: the body is entirely in savebuf, the body is
+ * split, or the body is entirely in fbuf. assert that the body
+ * is not entirely in save, or we'd have processed it last time.
+ */
+ ETCH_ASSERT(savebuf->datalen < pzr->bodylength);
+
+ if (savebuf->datalen == 0)
+ { /* saved buffer is empty, entire body is in fbuf */
+ curlen = fbuf->datalen;
+ curndx = fbuf->index;
+ etch_flexbuf_set_length(fbuf, curndx + pzr->bodylength);
+
+ /* send the packet in the input buffer up the chain to the next
+ * higher layer of the transport stack. fyi packetizer.session.thisx
+ * is that layer object, ordinarily the messagizer.
+ */
+ pzr->session->session_packet (pzr->session->thisx, whofrom, fbuf);
+
+ /* fyi the input buffer can contain a partial packet, multiple
+ * packets, or whatever, so we may not be done with it yet. */
+ etch_flexbuf_set_length(fbuf, curlen);
+ etch_flexbuf_set_index (fbuf, curndx + pzr->bodylength);
+ pzr->is_wantheader = TRUE;
+ }
+ else
+ { /* savebuf.datalen not zero, so body is split across the save buffer
+ * and the input buffer. move enough bytes from input buffer fbuf
+ * to complete the packet body.
+ */
+ bytes_needed = (int) (pzr->bodylength - savebuf->datalen);
+ bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
+ if (bytes_put <= 0) return -1;
+ etch_flexbuf_set_index(savebuf, 0);
+
+ /* send the newly-isolated packet up the chain to the next higher
+ * layer of the transport stack. fyi packetizer.session.thisx
+ * is that layer object, ordinarily the messagizer.
+ */
+ pzr->session->session_packet (pzr->session->thisx, whofrom, savebuf);
+
+ etch_flexbuf_reset(savebuf);
+ pzr->is_wantheader = TRUE;
+ }
+ }
+ else /* need body but too few bytes in pipeline to complete it */
+ { /* ... so save the input buffer to the save buffer */
+ bytes_put = etch_flexbuf_put_from (savebuf, fbuf, ETCH_FLEXBUF_PUT_ALL);
+ }
+ }
+
+ /* buffer is now empty and we're done */
+ return fbuf_avail == 0? 0: -1;
+}
+
+
+/*
+ * etch_pktizer_process_header()
+ * extract header information from packet
+ * returns data length from header, or -1 if bad header
+ */
+int etch_pktizer_process_header (etch_packetizer* pzr, etch_flexbuffer* fbuf, const int is_reset)
+{
+ int curlen = 0, signature = 0;
+ int result = etch_flexbuf_get_int(fbuf, &signature);
+ if (-1 == result || ETCH_PKTIZER_SIG != signature) return -1;
+
+ result = etch_flexbuf_get_int(fbuf, &curlen);
+
+ if (is_reset)
+ etch_flexbuf_reset(fbuf);
+
+ return result == -1 || curlen < 0 || curlen > (int) pzr->maxpacketsize?
+ -1: curlen;
+}
+
+
+/**
+ * etch_pktizer_session_control()
+ * i_sessiondata::session_control override.
+ * @param control event, caller relinquishes.
+ * @param value control value, caller relinquishes.
+ */
+int etch_pktizer_session_control (etch_packetizer* thisx, etch_event* control, objmask* value)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->session->session_control (thisx->session->thisx, control, value);
+}
+
+
+/**
+ * etch_pktizer_session_notify()
+ * i_sessiondata::session_notify override.
+ * @param event, caller relinquishes.
+ */
+int etch_pktizer_session_notify (etch_packetizer* thisx, etch_event* evt)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->session->session_notify (thisx->session->thisx, evt);
+}
+
+
+/**
+ * etch_pktizer_session_query()
+ * i_sessiondata::session_query override.
+ * @param query, caller relinquishes.
+ */
+objmask* etch_pktizer_session_query (etch_packetizer* thisx, objmask* query)
+{
+ ETCH_ASSERT(is_etch_packetizer(thisx));
+ return thisx->session->session_query (thisx->session->thisx, query);
+}