You are viewing a plain text version of this content. The canonical link for it is here.
Posted to savan-dev@ws.apache.org by da...@apache.org on 2007/06/20 09:25:33 UTC

svn commit: r548965 - in /webservices/savan/trunk/c: include/ samples/server/publisher/ src/client/ src/core/ src/handlers/ src/msgreceivers/ src/subscribers/ src/util/

Author: damitha
Date: Wed Jun 20 00:25:32 2007
New Revision: 548965

URL: http://svn.apache.org/viewvc?view=rev&rev=548965
Log:
More work on savan to improve subscription manager

Modified:
    webservices/savan/trunk/c/include/savan_constants.h
    webservices/savan/trunk/c/include/savan_publishing_client.h
    webservices/savan/trunk/c/include/savan_subscriber.h
    webservices/savan/trunk/c/include/savan_util.h
    webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
    webservices/savan/trunk/c/src/client/savan_publishing_client.c
    webservices/savan/trunk/c/src/core/savan_sub_processor.c
    webservices/savan/trunk/c/src/handlers/savan_out_handler.c
    webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
    webservices/savan/trunk/c/src/subscribers/savan_subscriber.c
    webservices/savan/trunk/c/src/util/savan_util.c

Modified: webservices/savan/trunk/c/include/savan_constants.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_constants.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_constants.h (original)
+++ webservices/savan/trunk/c/include/savan_constants.h Wed Jun 20 00:25:32 2007
@@ -52,9 +52,13 @@
 #define SAVAN_ACTIONS_GET_STATUS "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus"
 #define SAVAN_ACTIONS_GET_STATUS_RESPONSE "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse"
 
-#define SUBSCRIBER_STORE "SubscriberStore"
 #define EVENTING_NAMESPACE "http://schemas.xmlsoap.org/ws/2004/08/eventing"
 #define EVENTING_NS_PREFIX "wse"
+#define SAVAN_NAMESPACE "http://ws.apache.org/savan"
+#define SAVAN_NS_PREFIX "savan"
+#define ELEM_NAME_SUBSCRIBERS "Subscribers"
+#define ELEM_NAME_ADD_SUBSCRIBER "AddSubscriber"
+#define ELEM_NAME_TOPIC "Topic"
 #define DEFAULT_DELIVERY_MODE "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push"
 
 /* Eventing element names */
@@ -86,6 +90,7 @@
 #define SAVAN_OP_KEY_FILTER         "savan_op_key_filter"
 
 #define SAVAN_KEY_SUB_ID            "savan_key_subscriber_id"
+#define SAVAN_SUBSCRIBER_LIST       "savan_subs_list"
 
 /** @} */
 #ifdef __cplusplus

Modified: webservices/savan/trunk/c/include/savan_publishing_client.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_publishing_client.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_publishing_client.h (original)
+++ webservices/savan/trunk/c/include/savan_publishing_client.h Wed Jun 20 00:25:32 2007
@@ -45,6 +45,7 @@
      * Publish the given message to all subscribed clients
      * @param client the publishing client object
      * @param env pointer to environment struct
+     * @param payload
      * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE 
      */
     AXIS2_EXTERN axis2_status_t AXIS2_CALL
@@ -56,8 +57,7 @@
     AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
     savan_publishing_client_create(
         const axutil_env_t *env,
-        axis2_conf_ctx_t *conf_ctx,
-        axis2_svc_t *svc);
+        axutil_hash_t *subscriber_list);
 
 /** @} */
 #ifdef __cplusplus

Modified: webservices/savan/trunk/c/include/savan_subscriber.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_subscriber.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_subscriber.h (original)
+++ webservices/savan/trunk/c/include/savan_subscriber.h Wed Jun 20 00:25:32 2007
@@ -73,7 +73,18 @@
         savan_subscriber_t *subscriber,
         const axutil_env_t *env,
         axis2_endpoint_ref_t *end_to);
-                
+ 
+    /**
+     * Get EndTo end point.
+     * @param subscriber pointer to subscriber
+     * @param env pointer to environment struct
+     * @return end_to 
+     */
+    axis2_endpoint_ref_t *AXIS2_CALL
+    savan_subscriber_get_end_to(
+        savan_subscriber_t *subscriber,
+        const axutil_env_t *env);
+               
     /**
      * Set NotifyTo end point.
      * @param subscriber pointer to subscriber
@@ -87,6 +98,17 @@
         axis2_endpoint_ref_t *notify_to);
 
     /**
+     * Get NotifyTo end point.
+     * @param subscriber pointer to subscriber
+     * @param env pointer to environment struct
+     * @return notify_to
+     */
+    axis2_endpoint_ref_t *AXIS2_CALL
+    savan_subscriber_get_notify_to(
+        savan_subscriber_t *subscriber,
+        const axutil_env_t *env);
+
+    /**
      * Set delivery mode.
      * @param subscriber pointer to subscriber
      * @param env pointer to environment struct
@@ -114,7 +136,7 @@
      * Get expires.
      * @param subscriber pointer to subscriber
      * @param env pointer to environment struct
-     * @return expire time on success, else NULL
+     * @return expire date and time as string
      */
     axis2_char_t * AXIS2_CALL
     savan_subscriber_get_expires(
@@ -134,17 +156,28 @@
         const axis2_char_t *filter);
 
     /**
+     * Get filter.
+     * @param subscriber pointer to subscriber
+     * @param env pointer to environment struct
+     * @return filter the filter string
+     */
+    axis2_char_t *AXIS2_CALL
+    savan_subscriber_get_filter(
+        savan_subscriber_t *subscriber,
+        const axutil_env_t *env);
+
+    /**
      * Publishes the given msg to the client.
      * @param subscriber pointer to subscriber
      * @param env pointer to environment struct
-     * @param msg_ctx the msg to be published
+     * @param payload the content to be published
      * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE 
      */
     axis2_status_t AXIS2_CALL
     savan_subscriber_publish(
         savan_subscriber_t *subscriber,
         const axutil_env_t *env,
-        struct axis2_msg_ctx *msg_ctx);
+        const void *payload);
 
     /**
      * Set whether the subscription is renewed or not.
@@ -180,6 +213,17 @@
     savan_subscriber_create(
         const axutil_env_t *env);
     
+    axis2_status_t AXIS2_CALL
+        savan_subscriber_set_topic(
+        savan_subscriber_t *subscriber,
+        const axutil_env_t *env,
+        axis2_char_t *topic);
+
+    axis2_char_t *AXIS2_CALL
+    savan_subscriber_get_topic(
+        savan_subscriber_t *subscriber,
+        const axutil_env_t *env);
+
 /** @} */
 #ifdef __cplusplus
 }

Modified: webservices/savan/trunk/c/include/savan_util.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_util.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_util.h (original)
+++ webservices/savan/trunk/c/include/savan_util.h Wed Jun 20 00:25:32 2007
@@ -82,6 +82,19 @@
         axis2_msg_ctx_t *msg_ctx);
 
     /**
+    * Add the subscriber to subscription manager services' store
+    * @param env pointer to environment struct
+    * @param msg_ctx pointer to message context
+    * @param subscriber
+    * @return the store on success, else NULL
+    */
+    axis2_status_t AXIS2_CALL
+    savan_util_add_subscriber(
+        const axutil_env_t *env,
+        axis2_msg_ctx_t *msg_ctx,
+        savan_subscriber_t *subscriber);
+
+    /**
     * Calculate and return an expiry time for the subscription
     * @param env pointer to environment struct
     * @return the expiry time on success, else NULL
@@ -101,6 +114,17 @@
     savan_util_get_renewed_expiry_time(
         const axutil_env_t *env,
         axis2_char_t *expiry);
+    
+    /**
+    * Create storage hash and set as a service parameter.
+    * @param env pointer to environment struct
+    * @param svc subscription service
+    * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE 
+    */
+    AXIS2_EXTERN axis2_status_t AXIS2_CALL 
+    savan_util_set_sub_store(
+        axis2_svc_t *svc,
+        const axutil_env_t *env);
 
 /** @} */
 #ifdef __cplusplus

Modified: webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c (original)
+++ webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c Wed Jun 20 00:25:32 2007
@@ -40,6 +40,7 @@
     axutil_env_t *env;
     axis2_svc_t *svc;
     axis2_conf_ctx_t *conf_ctx;
+    axutil_hash_t *subscriber_list;
 }publisher_data_t;
 
 int AXIS2_CALL
@@ -54,16 +55,23 @@
  * This method invokes the right service method 
  */
 axiom_node_t* AXIS2_CALL 
-publisher_invoke(axis2_svc_skeleton_t *svc_skeleton,
-            const axutil_env_t *env,
-            axiom_node_t *node,
-            axis2_msg_ctx_t *msg_ctx);
-            
+publisher_invoke(
+    axis2_svc_skeleton_t *svc_skeleton,
+    const axutil_env_t *env,
+    axiom_node_t *node,
+    axis2_msg_ctx_t *msg_ctx);
+        
 
 int AXIS2_CALL 
 publisher_init(axis2_svc_skeleton_t *svc_skeleton,
           const axutil_env_t *env);
 
+int AXIS2_CALL 
+publisher_init_with_conf(
+    axis2_svc_skeleton_t *svc_skeleton,
+    const axutil_env_t *env,
+    axis2_conf_t *conf);
+
 axiom_node_t* AXIS2_CALL
 publisher_on_fault(axis2_svc_skeleton_t *svc_skeli, 
               const axutil_env_t *env, axiom_node_t *node);
@@ -77,7 +85,8 @@
     publisher_init,
     publisher_invoke,
     publisher_on_fault,
-    publisher_free
+    publisher_free,
+    publisher_init_with_conf
 };
     
 /*Create function */
@@ -126,14 +135,17 @@
  * This method invokes the right service method 
  */
 axiom_node_t* AXIS2_CALL
-publisher_invoke(axis2_svc_skeleton_t *svc_skeleton,
-            const axutil_env_t *env,
-            axiom_node_t *node,
-            axis2_msg_ctx_t *msg_ctx)
+publisher_invoke(
+    axis2_svc_skeleton_t *svc_skeleton,
+    const axutil_env_t *env,
+    axiom_node_t *node,
+    axis2_msg_ctx_t *msg_ctx)
 {
 
 	axutil_thread_t *worker_thread = NULL;
 	publisher_data_t *data = NULL;
+    axutil_param_t *param = NULL;
+    axutil_hash_t *subs_list = NULL;
 
     printf("publisher invoke called.\n");
 
@@ -145,6 +157,9 @@
     data->env = (axutil_env_t*)env;
     data->svc =  axis2_msg_ctx_get_svc(msg_ctx, env);
     data->conf_ctx =  axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+    param = axis2_svc_get_param(data->svc, env, SAVAN_SUBSCRIBER_LIST);
+    subs_list = axutil_param_get_value(param, env);
+    data->subscriber_list = subs_list;
     
     worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
         publisher_worker_func, (void*)data);
@@ -212,11 +227,13 @@
     axis2_conf_ctx_t *conf_ctx = NULL;
     axis2_svc_t *svc = NULL;
     savan_publishing_client_t *pub_client = NULL;
+    axutil_hash_t *subs_list = NULL;
     
     publisher_data_t *mydata = (publisher_data_t*)data;
     main_env = mydata->env;
     conf_ctx = mydata->conf_ctx;
     svc = mydata->svc;
+    subs_list = mydata->subscriber_list;
     
     env = axutil_init_thread_env(main_env);
 
@@ -229,7 +246,7 @@
     
     axiom_element_set_text(test_elem, env, "test data", test_node);
 
-    pub_client = savan_publishing_client_create(env, conf_ctx, svc);
+    pub_client = savan_publishing_client_create(env, subs_list);
     
     while(1)
     {

Modified: webservices/savan/trunk/c/src/client/savan_publishing_client.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/client/savan_publishing_client.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/client/savan_publishing_client.c (original)
+++ webservices/savan/trunk/c/src/client/savan_publishing_client.c Wed Jun 20 00:25:32 2007
@@ -18,6 +18,7 @@
 #include <axiom_element.h>
 #include <axiom_soap_body.h>
 #include <axis2_options.h>
+#include <axutil_array_list.h>
 #include <platforms/axutil_platform_auto_sense.h>
 
 #include <savan_publishing_client.h>
@@ -25,8 +26,7 @@
 
 struct savan_publishing_client_t
 {
-    axis2_conf_ctx_t *conf_ctx;
-    axis2_svc_t *svc;
+    axutil_hash_t *subscriber_list;
 };
 
 /******************************************************************************/
@@ -37,8 +37,7 @@
 AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
 savan_publishing_client_create(
     const axutil_env_t *env,
-    axis2_conf_ctx_t *conf_ctx,
-    axis2_svc_t *svc)
+    axutil_hash_t *subscriber_list)
 {
     savan_publishing_client_t *client = NULL;
     
@@ -51,9 +50,8 @@
         AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
         return NULL;        
     }
-    
-    client->conf_ctx = conf_ctx;
-    client->svc = svc;
+    if(subscriber_list) 
+        client->subscriber_list = subscriber_list;
 
     return client;
 }
@@ -72,6 +70,7 @@
     axis2_options_t *options = NULL;
     axis2_svc_client_t* svc_client = NULL;
     axutil_qname_t *op_qname = NULL;
+    axutil_property_t *property = NULL;
 
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
 
@@ -95,9 +94,7 @@
     op_qname = axutil_qname_create(env, "publish", NULL, NULL);
         
     /* Create service client */
-    svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, repo_path,
-        client->conf_ctx, client->svc);
-    
+    svc_client = axis2_svc_client_create(env, repo_path); 
     if (!svc_client)
     {
         AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to create a"
@@ -105,6 +102,9 @@
         return AXIS2_FAILURE;
     }
 
+    property = axutil_property_create_with_args(env, 0, 1,
+        axutil_hash_free_void_arg, client->subscriber_list);
+    axis2_options_set_property(options, env, SAVAN_SUBSCRIBER_LIST, property);
     /* Set service client options */
     axis2_svc_client_set_options(svc_client, env, options);
 
@@ -115,3 +115,4 @@
     
     return AXIS2_SUCCESS;
 }
+

Modified: webservices/savan/trunk/c/src/core/savan_sub_processor.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/core/savan_sub_processor.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/core/savan_sub_processor.c (original)
+++ webservices/savan/trunk/c/src/core/savan_sub_processor.c Wed Jun 20 00:25:32 2007
@@ -34,12 +34,7 @@
 };
 
 /* Function Prototypes ********************************************************/
-    
-axis2_status_t AXIS2_CALL 
-savan_sub_processor_set_sub_store(
-    axis2_svc_t *svc,
-    const axutil_env_t *env);
-    
+     
 savan_subscriber_t * AXIS2_CALL 
 savan_sub_processor_create_subscriber_from_msg(
     const axutil_env_t *env,
@@ -93,17 +88,9 @@
     const axutil_env_t *env,
     axis2_msg_ctx_t *msg_ctx)
 {
-    axis2_svc_t *subs_svc = NULL;
-    axutil_param_t *param = NULL;
-    axutil_hash_t *store = NULL;
     savan_subscriber_t *subscriber = NULL;
     axis2_char_t *expires = NULL;
     axis2_char_t *id = NULL;
-    axutil_qname_t *qname = NULL;
-    axis2_module_desc_t *module_desc = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
-    axis2_conf_t *conf = NULL;
-    axis2_char_t *subs_svc_name = NULL;
     
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     
@@ -120,54 +107,17 @@
             AXIS2_FAILURE);
         return AXIS2_FAILURE;
     }    
-
-    /* Set this subscriber inside a subscriber store maintained in the svc */
-
-    conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
-    conf = axis2_conf_ctx_get_conf(conf_ctx, env);
-    qname = axutil_qname_create(env, "savan", NULL, NULL);
-    module_desc = axis2_conf_get_module(conf, env, qname);
-    axutil_qname_free(qname, env);
-    param = axis2_module_desc_get_param(module_desc, env, "SubscriptionMgrName");
-    if(param)
-    {
-        subs_svc_name = axutil_param_get_value(param, env);
-        subs_svc = axis2_conf_get_svc(conf, env, subs_svc_name);
-    }
-    if(!subs_svc)
-    {
-        subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
-    }
-    if (!subs_svc)
-    {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract "
-            "the service"); 
-        return AXIS2_FAILURE;
-    }
-    
-    param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
-    if (!param)
-    {
-        /* Store not found. Create and set it as a param */
-        savan_sub_processor_set_sub_store(subs_svc, env);
-        param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
-    }
-    
-    store = (axutil_hash_t*)axutil_param_get_value(param, env);
-         
     /* Set the expiry time on the subscription */
     /* TODO : For now we are ignoring the Expiry sent by the client. Add support
      * to consider this when setting the expiry time */
     expires = savan_util_get_expiry_time(env);
     savan_subscriber_set_expires(subscriber, env, expires);
 
-    /* Store the created subscriber in the svc */
-    axutil_hash_set(store, savan_subscriber_get_id(subscriber, env),
-        AXIS2_HASH_KEY_STRING, subscriber);
-
     /* Store sub id in msg ctx to be used by the msg receiver */
     id = savan_subscriber_get_id(subscriber, env);
     savan_sub_processor_set_sub_id_to_msg_ctx(env, msg_ctx, id);
+
+    savan_util_add_subscriber(env, msg_ctx, subscriber);
     
     return AXIS2_SUCCESS;
 }
@@ -289,40 +239,6 @@
 
 /******************************************************************************/
 
-axis2_status_t AXIS2_CALL 
-savan_sub_processor_set_sub_store(
-    axis2_svc_t *svc,
-    const axutil_env_t *env)
-{
-    axutil_hash_t *store = NULL;
-    axutil_param_t *param = NULL;
-    
-    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
-    
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
-        "set sub store...");
-    
-    /* Create a hash map */
-    store = axutil_hash_make(env);
-    if (!store)
-    {
-        /* TODO : error reporting */
-        return AXIS2_FAILURE;
-    }
-    
-    /* Add the hash map as a parameter to the given service */
-    param = axutil_param_create(env, SUBSCRIBER_STORE, (void*)store);
-    if (!param)
-    {
-        /* TODO : error reporting */
-        return AXIS2_FAILURE;
-    }
-    
-    axis2_svc_add_param(svc, env, param);
-    
-    return AXIS2_SUCCESS;       
-}
-
 /******************************************************************************/
 
 savan_subscriber_t * AXIS2_CALL 
@@ -356,9 +272,11 @@
     axis2_char_t *notify = NULL;
     axis2_char_t *expires = NULL;
     axis2_char_t *filter = NULL;
+    axis2_char_t *topic = NULL;
     
     axis2_endpoint_ref_t *endto_epr = NULL;
     axis2_endpoint_ref_t *notify_epr = NULL;
+    axis2_endpoint_ref_t *topic_epr = NULL;
     
     AXIS2_ENV_CHECK(env, NULL);
     
@@ -460,6 +378,9 @@
     
     savan_subscriber_set_filter(subscriber, env, filter);
     
+    topic_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+    topic = (axis2_char_t *)axis2_endpoint_ref_get_address(topic_epr, env);
+    savan_subscriber_set_topic(subscriber, env, topic);
     return subscriber;    
 }
 

Modified: webservices/savan/trunk/c/src/handlers/savan_out_handler.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/handlers/savan_out_handler.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/handlers/savan_out_handler.c (original)
+++ webservices/savan/trunk/c/src/handlers/savan_out_handler.c Wed Jun 20 00:25:32 2007
@@ -23,6 +23,7 @@
 #include <axiom_soap_const.h>
 #include <axiom_soap_envelope.h>
 #include <axiom_soap_header.h>
+#include <axiom_soap_body.h>
 #include <axiom_soap_header_block.h>
 #include <axis2_op.h>
 #include <axis2_msg_ctx.h>
@@ -79,60 +80,40 @@
     struct axis2_msg_ctx *msg_ctx)
 {
     savan_message_types_t msg_type = SAVAN_MSG_TYPE_UNKNOWN;
-    /*axis2_svc_t *svc = NULL;*/
-    axutil_param_t *param = NULL;
-    axutil_hash_t *store = NULL;
-    const axis2_svc_t *svc = NULL;
-    const axis2_char_t *svc_name = NULL;
-    axutil_hash_index_t *hi = NULL;
-    void *val = NULL;
     
     AXIS2_ENV_CHECK( env, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
     
-    svc =  axis2_msg_ctx_get_svc(msg_ctx, env);
-    if (svc)
-        svc_name = axis2_svc_get_name (svc, env);
-    
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[%s][savan][out handler] invoke...",
-        svc_name);
-    
     /* Determine the eventing msg type */
     msg_type = savan_util_get_message_type(msg_ctx, env);
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[%s][savan][out handler] msg type:"
-        " %d", svc_name, msg_type);
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][out handler] msg type:"
+        " %d", msg_type);
     if (msg_type == SAVAN_MSG_TYPE_UNKNOWN)
     {
+        axutil_property_t *subs_list_property = NULL;
+        axutil_hash_t *subscriber_list = NULL;
+        axutil_hash_index_t *hi = NULL;
+        void *val = NULL;
         /* Treat unknown msgs as msgs for publishing */
-        
-        svc =  axis2_msg_ctx_get_svc(msg_ctx, env);
-        if (!svc)
-        {
-            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
-                "Service not found");
-            return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
-        }
-        
-        param = axis2_svc_get_param(svc, env, SUBSCRIBER_STORE);
-        if (!param)
+        subs_list_property = axis2_msg_ctx_get_property(msg_ctx, env, 
+            SAVAN_SUBSCRIBER_LIST);
+        if(subs_list_property)
+            subscriber_list = axutil_property_get_value(subs_list_property, env);    
+        if (!subscriber_list)
         {
             AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
-                "Subscribe store not found");
-            return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
-        }
-        
-        store = (axutil_hash_t*)axutil_param_get_value(param, env);
-        if (!store)
-        {
-            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
-                "Subscribe store is null");
+                "Subscribe subscriber_list is null");
             return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
         }
         
         /* Iterate the subscribe store and send the msg to each one */
-        
-        for (hi = axutil_hash_first(store, env); hi; hi = axutil_hash_next(env, hi))
+
+        for (hi = axutil_hash_first(subscriber_list, env); hi; hi = 
+            axutil_hash_next(env, hi))
         {
+            axiom_soap_envelope_t *soap_env = NULL;
+            axiom_soap_body_t *soap_body = NULL;
+            axiom_node_t *payload = NULL;
             savan_subscriber_t * sub = NULL;
             axutil_hash_this(hi, NULL, NULL, &val);
             sub = (savan_subscriber_t *)val;
@@ -141,14 +122,19 @@
                 axis2_char_t *id = savan_subscriber_get_id(sub, env);
                 AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][out handler] "
                     "Publishing to %s...", id);
-                savan_subscriber_publish(sub, env, msg_ctx);
+                soap_env = axis2_msg_ctx_get_soap_envelope(msg_ctx, env);
+                soap_body = axiom_soap_envelope_get_body(soap_env, env);
+                payload = axiom_soap_body_get_base_node(soap_body, env);
+                savan_subscriber_publish(sub, env, payload);
             }
-        
+
             val = NULL;
         }
 
-         axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
+
+        axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
     }
        
     return AXIS2_SUCCESS;
 }
+

Modified: webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c (original)
+++ webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c Wed Jun 20 00:25:32 2007
@@ -186,7 +186,7 @@
     axis2_conf_ctx_t *conf_ctx = NULL;
     axis2_conf_t *conf = NULL;
     axis2_module_desc_t *module_desc = NULL;
-    savan_subscriber_t *subscriber = NULL;
+    /*savan_subscriber_t *subscriber = NULL;*/
     
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][msg recv] "
         "handle sub request...");
@@ -248,8 +248,8 @@
     axiom_element_set_text(id_elem, env, id, id_node);
     
     /* Expires element. Get expiry time from subscriber and set */
-    subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
-    expires = savan_subscriber_get_expires(subscriber, env);
+    /*subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
+    expires = savan_subscriber_get_expires(subscriber, env);*/
     
     expires_elem = axiom_element_create(env, response_node, ELEM_NAME_EXPIRES, ns,
         &expires_node);

Modified: webservices/savan/trunk/c/src/subscribers/savan_subscriber.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/subscribers/savan_subscriber.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/subscribers/savan_subscriber.c (original)
+++ webservices/savan/trunk/c/src/subscribers/savan_subscriber.c Wed Jun 20 00:25:32 2007
@@ -30,6 +30,7 @@
     axis2_char_t *delivery_mode;
     axis2_char_t *expires;
     axis2_char_t *filter;
+    axis2_char_t *topic;
     axis2_bool_t renewed;
 };
 
@@ -112,6 +113,14 @@
     return AXIS2_SUCCESS;
 }
 
+axis2_endpoint_ref_t *AXIS2_CALL
+savan_subscriber_get_end_to(
+    savan_subscriber_t *subscriber,
+    const axutil_env_t *env)
+{
+    return subscriber->end_to;
+}
+
 /******************************************************************************/
 
 axis2_status_t AXIS2_CALL
@@ -127,6 +136,14 @@
     return AXIS2_SUCCESS;
 }    
             
+axis2_endpoint_ref_t *AXIS2_CALL
+savan_subscriber_get_notify_to(
+    savan_subscriber_t *subscriber,
+    const axutil_env_t *env)
+{
+    return subscriber->notify_to;
+}
+
 /******************************************************************************/
 
 axis2_status_t AXIS2_CALL
@@ -208,54 +225,45 @@
     return AXIS2_SUCCESS;
 }
 
+axis2_char_t *AXIS2_CALL
+savan_subscriber_get_filter(
+    savan_subscriber_t *subscriber,
+    const axutil_env_t *env)
+{
+    return subscriber->filter;
+}
+
 /******************************************************************************/
 
 axis2_status_t AXIS2_CALL
 savan_subscriber_publish(
     savan_subscriber_t *subscriber,
     const axutil_env_t *env,
-    struct axis2_msg_ctx *msg_ctx)
+    const void *payload)
 {
     axis2_svc_client_t *svc_client = NULL;
-    axis2_op_client_t *op_client = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
-    axis2_conf_t *conf = NULL;
-    axis2_svc_t *svc = NULL;
     axis2_char_t *path = NULL;
     axis2_options_t *options = NULL;
     axis2_status_t status = AXIS2_SUCCESS;
-    axutil_qname_t *op_qname = NULL;
+    axis2_endpoint_ref_t *notify_to = NULL;
+    axiom_node_t *ret_node = NULL;
 
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][subscribe] publish...");
 	
     path = AXIS2_GETENV("AXIS2C_HOME");
-    conf_ctx =  axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
-    conf =  axis2_conf_ctx_get_conf(conf_ctx, env);
 
-    /* Get anonymous service from conf. This will be null for the first time, 
-     * but then it will be created when we create the svc_client */
-    svc = axis2_conf_get_svc(conf, env, AXIS2_ANON_SERVICE);
-
-    svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, path,
-        conf_ctx, svc);
+    svc_client = axis2_svc_client_create(env, path);
 
     /* Setup options */
     options = axis2_options_create(env);
-    axis2_options_set_to(options, env, subscriber->notify_to);
-    
+    notify_to = savan_subscriber_get_notify_to(subscriber, env);
+    axis2_options_set_to(options, env, notify_to);
     /* Set service client options */
     axis2_svc_client_set_options(svc_client, env, options);
 
     /* Engage addressing module */
-    /*axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);*/
-    
-    op_qname = axutil_qname_create(env, AXIS2_ANON_OUT_ONLY_OP, NULL, NULL);
-
-    op_client = axis2_svc_client_create_op_client(svc_client, env,
-        op_qname);
-
-    axis2_op_client_add_msg_ctx(op_client, env, msg_ctx);
-    status = axis2_op_client_execute(op_client, env, AXIS2_TRUE);
+    axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+    ret_node = axis2_svc_client_send_receive(svc_client, env, payload);
     
     return status;
 }
@@ -282,3 +290,26 @@
 {
     return subscriber->renewed;
 }
+
+axis2_status_t AXIS2_CALL
+savan_subscriber_set_topic(
+    savan_subscriber_t *subscriber,
+    const axutil_env_t *env,
+    axis2_char_t *topic)
+{
+    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+    
+    subscriber->topic = topic;
+
+    return AXIS2_SUCCESS;
+}
+
+axis2_char_t *AXIS2_CALL
+savan_subscriber_get_topic(
+    savan_subscriber_t *subscriber,
+    const axutil_env_t *env)
+{
+    return subscriber->topic;
+}
+
+

Modified: webservices/savan/trunk/c/src/util/savan_util.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/util/savan_util.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/util/savan_util.c (original)
+++ webservices/savan/trunk/c/src/util/savan_util.c Wed Jun 20 00:25:32 2007
@@ -15,6 +15,10 @@
  */
 
 #include <axis2_msg_info_headers.h>
+#include <axis2_options.h>
+#include <axis2_svc_client.h>
+#include <axis2_endpoint_ref.h>
+#include <platforms/axutil_platform_auto_sense.h>
 #include <axiom_soap.h>
 
 #include <savan_util.h>
@@ -22,6 +26,33 @@
 
 /******************************************************************************/
 
+static axis2_status_t
+add_subscriber_to_remote_subs_mgr(
+    const axutil_env_t *env,
+    savan_subscriber_t *subscriber,
+    axis2_char_t *subs_mgr_url);
+
+static axutil_hash_t *
+get_subscriber_list_from_remote_subs_mgr(
+    const axutil_env_t *env,
+    axis2_char_t *topic,
+    axis2_char_t *subs_mgr_url);
+
+static axiom_node_t *
+build_add_subscriber_om_payload(
+    const axutil_env_t *env,
+    savan_subscriber_t *subscriber);
+
+static axiom_node_t *
+build_subscribers_request_om_payload(
+    const axutil_env_t *env,
+    axis2_char_t *topic);
+
+static axutil_hash_t *
+process_subscriber_list_node(
+    const axutil_env_t *env,
+    axiom_node_t *subs_list_node);
+
 savan_message_types_t AXIS2_CALL
 savan_util_get_message_type(
     axis2_msg_ctx_t *msg_ctx,
@@ -165,51 +196,490 @@
     axis2_svc_t *subs_svc = NULL;
     axutil_param_t *param = NULL;
     axutil_hash_t *store = NULL;
-    axutil_qname_t *qname = NULL;
-    axis2_module_desc_t *module_desc = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
-    axis2_conf_t *conf = NULL;
     axis2_char_t *subs_svc_name = NULL;
+    axis2_char_t *topic = NULL;
+    axis2_endpoint_ref_t *topic_epr = NULL;
 
     AXIS2_ENV_CHECK(env, NULL);
 
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][util] "
         "get subscriber store...");
 
-    conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
-    conf = axis2_conf_ctx_get_conf(conf_ctx, env);
-    qname = axutil_qname_create(env, "savan", NULL, NULL);
-    module_desc = axis2_conf_get_module(conf, env, qname);
-    axutil_qname_free(qname, env);
-    param = axis2_module_desc_get_param(module_desc, env, "SubscriptionMgrName");
+    subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
+    if (!subs_svc)
+    {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
+            "service"); 
+        return NULL;
+    }
+    param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrName");
     if(param)
     {
         subs_svc_name = axutil_param_get_value(param, env);
-        subs_svc = axis2_conf_get_svc(conf, env, subs_svc_name);
     }
-    if(!subs_svc)
+    if(subs_svc_name)
     {
-        subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
+        axis2_char_t *subs_mgr_url = NULL;
+        param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrURL");
+        subs_mgr_url = axutil_param_get_value(param, env);
+        topic_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+        topic = (axis2_char_t *) axis2_endpoint_ref_get_address(topic_epr, env);
+        store = get_subscriber_list_from_remote_subs_mgr(env, topic, subs_mgr_url);
+    }
+    else
+    {
+        param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+        if (!param)
+        {
+            /* Store not found. Create and set it as a param */
+            savan_util_set_sub_store(subs_svc, env);
+            param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+        }
+    
+        store = (axutil_hash_t*)axutil_param_get_value(param, env);
     }
+    return store;
+}
+
+axis2_status_t AXIS2_CALL
+savan_util_add_subscriber(
+    const axutil_env_t *env,
+    axis2_msg_ctx_t *msg_ctx,
+    savan_subscriber_t *subscriber)
+{
+    axis2_svc_t *subs_svc = NULL;
+    axutil_param_t *param = NULL;
+    axutil_hash_t *store = NULL;
+    axis2_char_t *subs_svc_name = NULL;
+
+    subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
     if (!subs_svc)
     {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
-            "service"); 
-        return NULL;
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract "
+            "the service"); 
+        return AXIS2_FAILURE;
+    }
+    param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrName");
+    printf("came10\n");
+    if(param)
+    {
+        subs_svc_name = axutil_param_get_value(param, env);
+    }
+    if(subs_svc_name)
+    {
+        axis2_char_t *subs_mgr_url = NULL;
+        param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrURL");
+        subs_mgr_url = axutil_param_get_value(param, env);
+        add_subscriber_to_remote_subs_mgr(env, subscriber, subs_mgr_url);
     }
+    else
+    {
+        param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+        if (!param)
+        {
+            /* Store not found. Create and set it as a param */
+            savan_util_set_sub_store(subs_svc, env);
+            param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+        }
+    
+        store = (axutil_hash_t*)axutil_param_get_value(param, env);
+        /* Store the created subscriber in the svc */
+        axutil_hash_set(store, savan_subscriber_get_id(subscriber, env), 
+            AXIS2_HASH_KEY_STRING, subscriber);
+    } 
+    
+    return AXIS2_SUCCESS;
+}
 
-    param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
+axis2_status_t AXIS2_CALL 
+savan_util_set_sub_store(
+    axis2_svc_t *svc,
+    const axutil_env_t *env)
+{
+    axutil_hash_t *store = NULL;
+    axutil_param_t *param = NULL;
+    
+    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+    
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+        "Start:set_sub_store");
+    
+    /* Create a hash map */
+    store = axutil_hash_make(env);
+    if (!store)
+    {
+        /* TODO : error reporting */
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+        "Could not create subscriber store");
+        return AXIS2_FAILURE;
+    }
+    
+    /* Add the hash map as a parameter to the given service */
+    param = axutil_param_create(env, SAVAN_SUBSCRIBER_LIST, (void*)store);
     if (!param)
     {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
-            "subscriber store param"); 
+        /* TODO : error reporting */
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+        "Could not create subscriber store param");
+        return AXIS2_FAILURE;
+    }
+    
+    axis2_svc_add_param(svc, env, param);
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+        "End:set_sub_store");
+    
+    return AXIS2_SUCCESS;       
+}
+
+static axis2_status_t
+add_subscriber_to_remote_subs_mgr(
+    const axutil_env_t *env,
+    savan_subscriber_t *subscriber,
+    axis2_char_t *subs_mgr_url)
+{
+    const axis2_char_t *address = NULL;
+    axis2_endpoint_ref_t* endpoint_ref = NULL;
+    axis2_options_t *options = NULL;
+    const axis2_char_t *client_home = NULL;
+    axis2_svc_client_t* svc_client = NULL;
+    axiom_node_t *payload = NULL;
+
+    /* Set up the environment */
+    env = axutil_env_create_all("savan.log", AXIS2_LOG_LEVEL_TRACE);
+
+    /* Set end point reference of echo service */
+    address = subs_mgr_url;
+    printf("[savan] Using endpoint : %s\n", address);
+
+    /* Create EPR with given address */
+    endpoint_ref = axis2_endpoint_ref_create(env, address);
+
+    /* Setup options */
+    options = axis2_options_create(env);
+    axis2_options_set_to(options, env, endpoint_ref);
+    axis2_options_set_action(options, env,
+        "http://ws.apache.org/axis2/c/subscription/add_subscriber");
+
+    /* Set up deploy folder. It is from the deploy folder, the configuration is 
+     * picked up using the axis2.xml file.
+     * In this sample client_home points to the Axis2/C default deploy folder. 
+     * The client_home can be different from this folder on your system. For 
+     * example, you may have a different folder (say, my_client_folder) with its 
+     * own axis2.xml file. my_client_folder/modules will have the modules that 
+     * the client uses
+     */
+    client_home = (const axis2_char_t *) AXIS2_GETENV("AXIS2C_HOME");
+    if (!client_home)
+        client_home = "../../deploy";
+
+    /* Create service client */
+    svc_client = axis2_svc_client_create(env, client_home);
+    if (!svc_client)
+    {
+        printf("Error creating service client\n");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Stub invoke FAILED: Error code:"
+            " %d :: %s", env->error->error_number,
+            AXIS2_ERROR_GET_MESSAGE(env->error));
+        return -1;
+    }
+    axis2_options_set_soap_version(options, env, AXIOM_SOAP11);
+    /* Set service client options */
+    axis2_svc_client_set_options(svc_client, env, options);    
+    
+    axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+    payload = build_add_subscriber_om_payload(env, subscriber);
+    /* Send request */
+    axis2_svc_client_send_robust(svc_client, env, payload);
+
+    return AXIS2_SUCCESS;
+}
+
+static axutil_hash_t *
+get_subscriber_list_from_remote_subs_mgr(
+    const axutil_env_t *env,
+    axis2_char_t *topic,
+    axis2_char_t *subs_mgr_url)
+{
+    axis2_endpoint_ref_t* endpoint_ref = NULL;
+    axis2_options_t *options = NULL;
+    const axis2_char_t *client_home = NULL;
+    axis2_svc_client_t* svc_client = NULL;
+    axiom_node_t *payload = NULL;
+    axiom_node_t *ret_node = NULL;
+    axutil_hash_t *subscriber_list = NULL;
+
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+        "[savan] Start:get_subscriber_list_from_remote_subs_mgr");
+    options = axis2_options_create(env);
+    axis2_options_set_action(options, env,
+        "http://ws.apache.org/axis2/c/subscription/get_subscriber_list");
+
+    client_home = AXIS2_GETENV("AXIS2C_HOME");
+    if (!client_home)
+        client_home = "../../deploy";
+
+    svc_client = axis2_svc_client_create(env, client_home);
+    if (!svc_client)
+    {
+        printf("Error creating service client\n");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+            "[ML] Stub invoke FAILED: Error code:"
+            " %d :: %s", env->error->error_number,
+            AXIS2_ERROR_GET_MESSAGE(env->error));
         return NULL;
     }
+    endpoint_ref = axis2_endpoint_ref_create(env, subs_mgr_url);
+    axis2_options_set_to(options, env, endpoint_ref);
+    printf("[savan] Using endpoint : %s\n", subs_mgr_url);
+    axis2_options_set_soap_version(options, env, AXIOM_SOAP11);
+    axis2_svc_client_set_options(svc_client, env, options);    
+    
+    axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+    payload = build_subscribers_request_om_payload(env, topic);
+    ret_node = axis2_svc_client_send_receive(svc_client, env, payload);
+    if (ret_node)
+    {
+        subscriber_list = process_subscriber_list_node(env, ret_node);
+    }
+    else
+    {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[ML] Stub invoke FAILED: Error code:"
+            " %d :: %s", env->error->error_number,
+            AXIS2_ERROR_GET_MESSAGE(env->error));
+        printf("Retrieving subscriber list FAILED!\n");
+    }
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+        "[savan] End:get_subscriber_list_from_remote_subs_mgr");
+    return subscriber_list;
+}
 
-    store = (axutil_hash_t*)axutil_param_get_value(param, env);
+static axiom_node_t *
+build_subscribers_request_om_payload(
+    const axutil_env_t *env,
+    axis2_char_t *topic)
+{
+    axiom_node_t *om_node = NULL;
+    axiom_element_t* om_ele = NULL;
+    axiom_node_t* topic_om_node = NULL;
+    axiom_element_t * topic_om_ele = NULL;
+    axiom_namespace_t *ns1 = NULL;
+    axis2_char_t *om_str = NULL;
+
+    ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX);
+    om_ele = axiom_element_create(env, NULL, ELEM_NAME_SUBSCRIBERS, ns1, &om_node);
+    topic_om_ele = axiom_element_create(env, om_node, ELEM_NAME_TOPIC, ns1, 
+        &topic_om_node);
+    axiom_element_set_text(topic_om_ele, env, topic, topic_om_node);
 
-    return store;
+    om_str = axiom_node_to_string(om_node, env);
+    if (om_str)
+    {
+        printf("\nSending OM : %s\n", om_str);
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Sending OM : %s", om_str);
+        AXIS2_FREE(env->allocator, om_str);
+        om_str =  NULL;
+    }
+    return om_node;
+}
+
+static axutil_hash_t *
+process_subscriber_list_node(
+    const axutil_env_t *env,
+    axiom_node_t *subs_list_node)
+{
+    axiom_element_t *subs_list_element = NULL;
+    axiom_children_qname_iterator_t *subs_iter = NULL;
+    axutil_hash_t *subscriber_list = axutil_hash_make(env);
+    axutil_qname_t *qname = NULL;
+
+    subs_list_element = axiom_node_get_data_element(subs_list_node, env); 
+    
+    /* Get Subscriber elements from subscriber list */
+    qname = axutil_qname_create(env, ELEM_NAME_SUBSCRIBE, EVENTING_NAMESPACE, NULL);
+    subs_iter = axiom_element_get_children_with_qname(subs_list_element, env,
+        qname, subs_list_node);
+    if(!subs_iter)
+    {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[ML] Subscribers list is empty");
+        return NULL;
+    }
+    while(axiom_children_qname_iterator_has_next(subs_iter, env))
+    {
+        savan_subscriber_t *subscriber = NULL;
+        axiom_node_t *sub_node = NULL;
+        axiom_node_t *endto_node = NULL;
+        axiom_node_t *delivery_node = NULL;
+        axiom_node_t *notify_node = NULL;
+        axiom_node_t *filter_node = NULL;
+        axiom_node_t *expires_node = NULL;
+
+        axiom_element_t *sub_elem = NULL;
+        axiom_element_t *endto_elem = NULL;
+        axiom_element_t *delivery_elem = NULL;
+        axiom_element_t *notify_elem = NULL;
+        axiom_element_t *expires_elem = NULL;
+        axiom_element_t *filter_elem = NULL;
+
+        axis2_char_t *endto = NULL;
+        axis2_char_t *notify = NULL;
+        axis2_char_t *expires = NULL;
+        axis2_char_t *filter = NULL;
+
+        axis2_endpoint_ref_t *endto_epr = NULL;
+        axis2_endpoint_ref_t *notify_epr = NULL;
+     
+        sub_node = axiom_children_qname_iterator_next(subs_iter, env);
+        if(sub_node)
+        {
+            subscriber = savan_subscriber_create(env);
+            if (!subscriber)
+            {
+                AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[ML] Failed to create a"
+                    "subscriber instance");
+                return NULL;
+            }
+            /* Now read each sub element of Subscribe element */
+
+            /* EndTo */
+            qname = axutil_qname_create(env, ELEM_NAME_ENDTO, EVENTING_NAMESPACE, NULL);
+            endto_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+                sub_node, &endto_node);
+            axutil_qname_free(qname, env);
+
+            endto = axiom_element_get_text(endto_elem, env, endto_node);
+
+            endto_epr = axis2_endpoint_ref_create(env, endto);
+
+            savan_subscriber_set_end_to(subscriber, env, endto_epr);
+
+            /* Get Delivery element and read NotifyTo */
+            qname = axutil_qname_create(env, ELEM_NAME_DELIVERY, EVENTING_NAMESPACE, NULL);
+            delivery_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+                sub_node, &delivery_node);
+            axutil_qname_free(qname, env);
+
+            qname = axutil_qname_create(env, ELEM_NAME_NOTIFYTO, EVENTING_NAMESPACE, NULL);
+            notify_elem = axiom_element_get_first_child_with_qname(delivery_elem, env, qname,
+                delivery_node, &notify_node);
+            axutil_qname_free(qname, env);
+            notify = axiom_element_get_text(notify_elem, env, notify_node);
+
+            notify_epr = axis2_endpoint_ref_create(env, notify);
+
+            savan_subscriber_set_notify_to(subscriber, env, notify_epr);
+
+            /* Expires */
+            qname = axutil_qname_create(env, ELEM_NAME_EXPIRES, EVENTING_NAMESPACE, NULL);
+            expires_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+                sub_node, &expires_node);
+            axutil_qname_free(qname, env);
+
+            expires = axiom_element_get_text(expires_elem, env, expires_node);
+
+            savan_subscriber_set_expires(subscriber, env, expires);
+
+            /* Filter */
+            qname = axutil_qname_create(env, ELEM_NAME_FILTER, EVENTING_NAMESPACE, NULL);
+            filter_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+                sub_node, &filter_node);
+            axutil_qname_free(qname, env);
+
+            filter = axiom_element_get_text(filter_elem, env, filter_node);
+
+            savan_subscriber_set_filter(subscriber, env, filter);
+            axutil_hash_set(subscriber_list, savan_subscriber_get_id(subscriber, 
+                env), AXIS2_HASH_KEY_STRING, subscriber);
+        }
+    }
+    axutil_qname_free(qname, env);
+    return subscriber_list;
 }
+
+
+
+static axiom_node_t *
+build_add_subscriber_om_payload(
+    const axutil_env_t *env,
+    savan_subscriber_t *subscriber)
+{
+    axiom_node_t *add_node = NULL;
+    axiom_element_t* add_ele = NULL;
+    axiom_namespace_t *ns = NULL;
+    axiom_namespace_t *ns1 = NULL;
+    axiom_node_t *sub_node = NULL;
+    axiom_node_t *id_node = NULL;
+    axiom_node_t *topic_node = NULL;
+    axiom_node_t *endto_node = NULL;
+    axiom_node_t *delivery_node = NULL;
+    axiom_node_t *notify_node = NULL;
+    axiom_node_t *filter_node = NULL;
+    axiom_node_t *expires_node = NULL;
+    axiom_element_t* sub_elem = NULL;
+    axiom_element_t* id_elem = NULL;
+    axiom_element_t* topic_elem = NULL;
+    axiom_element_t* endto_elem = NULL;
+    axiom_element_t* delivery_elem = NULL;
+    axiom_element_t* notify_elem = NULL;
+    axiom_element_t* filter_elem = NULL;
+    axiom_element_t* expires_elem = NULL;
+    const axis2_char_t *endto = NULL;
+    const axis2_char_t *notify = NULL;
+    axis2_char_t *filter = NULL;
+    const axis2_char_t *expires = NULL;
+    axis2_char_t *topic = NULL;
+    axis2_char_t *id = NULL;
+
+    axis2_endpoint_ref_t *endto_ref = savan_subscriber_get_end_to(subscriber, env);
+    endto = axis2_endpoint_ref_get_address(endto_ref, env);
+    axis2_endpoint_ref_t *notify_ref = savan_subscriber_get_notify_to(subscriber, env);
+    notify = axis2_endpoint_ref_get_address(notify_ref, env);
+    filter = savan_subscriber_get_filter(subscriber, env); 
+    expires = savan_subscriber_get_expires(subscriber, env); 
+    id = savan_subscriber_get_id(subscriber, env);
+
+    ns = axiom_namespace_create (env, EVENTING_NAMESPACE, EVENTING_NS_PREFIX);
+    ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX);
+    add_ele = axiom_element_create(env, NULL, ELEM_NAME_ADD_SUBSCRIBER, ns1, &add_node);
+    
+    /* create the id element */
+    if(id)
+    {
+        id_elem = axiom_element_create(env, add_node, ELEM_NAME_ID, ns1, &id_node);
+            axiom_element_set_text(id_elem, env, id, id_node);
+    }
+    /* create the topic element */
+    topic_elem = axiom_element_create(env, add_node, ELEM_NAME_TOPIC, ns1, &topic_node);
+    topic = savan_subscriber_get_topic(subscriber, env);
+    if(topic)
+        axiom_element_set_text(topic_elem, env, topic, topic_node);
+    /* create the subscriber element */
+    sub_elem = axiom_element_create(env, add_node, ELEM_NAME_SUBSCRIBE, ns, &sub_node);
+    
+    /* EndTo element */
+    endto_elem = axiom_element_create(env, sub_node, ELEM_NAME_ENDTO, ns,
+        &endto_node);
+    axiom_element_set_text(endto_elem, env, endto, endto_node);
+    
+    /* Delivery element */
+    delivery_elem = axiom_element_create(env, sub_node, ELEM_NAME_DELIVERY, ns,
+        &delivery_node);
+        
+    notify_elem = axiom_element_create(env, delivery_node, ELEM_NAME_NOTIFYTO, ns,
+        &notify_node);
+    axiom_element_set_text(notify_elem, env, notify, notify_node);
+    
+    /* Expires element */
+    expires_elem = axiom_element_create(env, sub_node, ELEM_NAME_EXPIRES, ns,
+        &expires_node);
+    axiom_element_set_text(expires_elem, env, expires, expires_node);
+    /* Filter element */
+    filter_elem = axiom_element_create(env, sub_node, ELEM_NAME_FILTER, ns,
+        &endto_node);
+    axiom_element_set_text(filter_elem, env, filter, filter_node);
+    
+    return add_node;
+}
+
 
 /******************************************************************************/