You are viewing a plain text version of this content. The canonical link for it is here.
Posted to savan-dev@ws.apache.org by da...@apache.org on 2007/06/21 13:35:46 UTC

svn commit: r549451 - in /webservices/savan/trunk/c: include/ samples/client/subscriber/ samples/server/publisher/ src/client/ src/core/ src/handlers/ src/msgreceivers/ src/subscribers/

Author: damitha
Date: Thu Jun 21 04:35:44 2007
New Revision: 549451

URL: http://svn.apache.org/viewvc?view=rev&rev=549451
Log:
publisher sample is now working with new changes in savanc

Modified:
    webservices/savan/trunk/c/include/savan_publishing_client.h
    webservices/savan/trunk/c/include/savan_subscriber.h
    webservices/savan/trunk/c/samples/client/subscriber/subscriber.c
    webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
    webservices/savan/trunk/c/samples/server/publisher/services.xml
    webservices/savan/trunk/c/src/client/savan_publishing_client.c
    webservices/savan/trunk/c/src/core/savan_sub_processor.c
    webservices/savan/trunk/c/src/handlers/savan_out_handler.c
    webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
    webservices/savan/trunk/c/src/subscribers/savan_subscriber.c

Modified: webservices/savan/trunk/c/include/savan_publishing_client.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_publishing_client.h?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/include/savan_publishing_client.h (original)
+++ webservices/savan/trunk/c/include/savan_publishing_client.h Thu Jun 21 04:35:44 2007
@@ -57,7 +57,13 @@
     AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
     savan_publishing_client_create(
         const axutil_env_t *env,
-        axutil_hash_t *subscriber_list);
+        axis2_conf_ctx_t *conf_ctx,
+        axis2_svc_t *svc);
+
+    AXIS2_EXTERN void AXIS2_CALL
+    savan_publishing_client_free(
+        savan_publishing_client_t *client, 
+        const axutil_env_t *env);
 
 /** @} */
 #ifdef __cplusplus

Modified: webservices/savan/trunk/c/include/savan_subscriber.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_subscriber.h?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/include/savan_subscriber.h (original)
+++ webservices/savan/trunk/c/include/savan_subscriber.h Thu Jun 21 04:35:44 2007
@@ -177,7 +177,7 @@
     savan_subscriber_publish(
         savan_subscriber_t *subscriber,
         const axutil_env_t *env,
-        const void *payload);
+        struct axis2_msg_ctx *msg_ctx);
 
     /**
      * Set whether the subscription is renewed or not.

Modified: webservices/savan/trunk/c/samples/client/subscriber/subscriber.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/samples/client/subscriber/subscriber.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/samples/client/subscriber/subscriber.c (original)
+++ webservices/savan/trunk/c/samples/client/subscriber/subscriber.c Thu Jun 21 04:35:44 2007
@@ -51,7 +51,7 @@
 
     client_home = AXIS2_GETENV("AXIS2C_HOME");
     
-    init_event_source((axutil_env_t*)env, client_home);
+    /*init_event_source((axutil_env_t*)env, client_home);*/
     
     /* Set end point reference of echo service */
     address = "http://localhost:9090/axis2/services/publisher";

Modified: webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c (original)
+++ webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c Thu Jun 21 04:35:44 2007
@@ -38,9 +38,7 @@
 typedef struct publisher_data
 {
     axutil_env_t *env;
-    axis2_svc_t *svc;
-    axis2_conf_ctx_t *conf_ctx;
-    axutil_hash_t *subscriber_list;
+    axis2_conf_t *conf;
 }publisher_data_t;
 
 int AXIS2_CALL
@@ -66,6 +64,11 @@
 publisher_init(axis2_svc_skeleton_t *svc_skeleton,
           const axutil_env_t *env);
 
+static void
+start_publisher_thread(
+    const axutil_env_t *env,
+    axis2_conf_t *conf);
+
 int AXIS2_CALL 
 publisher_init_with_conf(
     axis2_svc_skeleton_t *svc_skeleton,
@@ -131,6 +134,17 @@
     return AXIS2_SUCCESS;
 }
 
+int AXIS2_CALL 
+publisher_init_with_conf(
+    axis2_svc_skeleton_t *svc_skeleton,
+    const axutil_env_t *env,
+    axis2_conf_t *conf)
+{
+    publisher_init(svc_skeleton, env);
+    start_publisher_thread(env, conf); 
+    return AXIS2_SUCCESS;
+}
+
 /*
  * This method invokes the right service method 
  */
@@ -141,13 +155,24 @@
     axiom_node_t *node,
     axis2_msg_ctx_t *msg_ctx)
 {
+    axis2_conf_ctx_t *conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+    axis2_conf_t *conf = axis2_conf_ctx_get_conf(conf_ctx, env);
+
+    start_publisher_thread(env, conf); 
+    return axis2_publisher_start(env, node);
+}
+
+static void
+start_publisher_thread(
+    const axutil_env_t *env,
+    axis2_conf_t *conf)
+{
 
 	axutil_thread_t *worker_thread = NULL;
 	publisher_data_t *data = NULL;
-    axutil_param_t *param = NULL;
-    axutil_hash_t *subs_list = NULL;
 
     printf("publisher invoke called.\n");
+    
 
     /* Invoke the business logic.
      * Depending on the function name invoke the correct impl method.
@@ -155,22 +180,16 @@
 
     data = AXIS2_MALLOC(env->allocator, sizeof(publisher_data_t));
     data->env = (axutil_env_t*)env;
-    data->svc =  axis2_msg_ctx_get_svc(msg_ctx, env);
-    data->conf_ctx =  axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
-    param = axis2_svc_get_param(data->svc, env, SAVAN_SUBSCRIBER_LIST);
-    subs_list = axutil_param_get_value(param, env);
-    data->subscriber_list = subs_list;
+    data->conf = conf;
     
     worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
         publisher_worker_func, (void*)data);
     if(! worker_thread)
     {
         printf("failed to create thread");
-        return AXIS2_FAILURE;
+        return;
     }
     axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-    
-    return axis2_publisher_start(env, node);
 }
 
 /* On fault, handle the fault */
@@ -224,16 +243,14 @@
     axiom_namespace_t *test_ns = NULL;
     axiom_node_t *test_node = NULL;
     axiom_element_t* test_elem = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
+    axis2_conf_t *conf = NULL;
     axis2_svc_t *svc = NULL;
-    savan_publishing_client_t *pub_client = NULL;
-    axutil_hash_t *subs_list = NULL;
+    axutil_param_t *param = NULL;
+    axis2_conf_ctx_t *conf_ctx = NULL;
     
     publisher_data_t *mydata = (publisher_data_t*)data;
     main_env = mydata->env;
-    conf_ctx = mydata->conf_ctx;
-    svc = mydata->svc;
-    subs_list = mydata->subscriber_list;
+    conf = mydata->conf;
     
     env = axutil_init_thread_env(main_env);
 
@@ -246,12 +263,23 @@
     
     axiom_element_set_text(test_elem, env, "test data", test_node);
 
-    pub_client = savan_publishing_client_create(env, subs_list);
-    
+    svc = axis2_conf_get_svc(conf, env, "publisher");
+    conf_ctx = axis2_conf_ctx_create(env, conf);
     while(1)
     {
-        savan_publishing_client_publish(pub_client, env, test_node);
+        axutil_hash_t *subs_list = NULL;
 
+        param = axis2_svc_get_param(svc, env, SAVAN_SUBSCRIBER_LIST);
+        if(param)
+            subs_list = axutil_param_get_value(param, env);
+        if(subs_list)
+        {
+            savan_publishing_client_t *pub_client = NULL;
+
+            pub_client = savan_publishing_client_create(env, conf_ctx, svc);
+            savan_publishing_client_publish(pub_client, env, test_node);
+            savan_publishing_client_free(pub_client, env);
+        }
         AXIS2_SLEEP(10);
         
         printf("Returned from sleep\n");

Modified: webservices/savan/trunk/c/samples/server/publisher/services.xml
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/samples/server/publisher/services.xml?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/samples/server/publisher/services.xml (original)
+++ webservices/savan/trunk/c/samples/server/publisher/services.xml Thu Jun 21 04:35:44 2007
@@ -1,6 +1,6 @@
 <service name="publisher">
     <parameter name="ServiceClass" locked="xsd:false">publisher</parameter>
-
+    <parameter name="loadServiceAtStartup" locked="xsd:false">true</parameter>
    <description>
         This is a testing service , to test the system is working or not
    </description>

Modified: webservices/savan/trunk/c/src/client/savan_publishing_client.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/client/savan_publishing_client.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/src/client/savan_publishing_client.c (original)
+++ webservices/savan/trunk/c/src/client/savan_publishing_client.c Thu Jun 21 04:35:44 2007
@@ -19,6 +19,8 @@
 #include <axiom_soap_body.h>
 #include <axis2_options.h>
 #include <axutil_array_list.h>
+#include <axis2_conf_ctx.h>
+#include <axis2_svc.h>
 #include <platforms/axutil_platform_auto_sense.h>
 
 #include <savan_publishing_client.h>
@@ -26,7 +28,8 @@
 
 struct savan_publishing_client_t
 {
-    axutil_hash_t *subscriber_list;
+    axis2_conf_ctx_t *conf_ctx;
+    axis2_svc_t *svc;
 };
 
 /******************************************************************************/
@@ -37,27 +40,37 @@
 AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
 savan_publishing_client_create(
     const axutil_env_t *env,
-    axutil_hash_t *subscriber_list)
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_svc_t *svc)
 {
     savan_publishing_client_t *client = NULL;
-    
+
     AXIS2_ENV_CHECK(env, NULL);
-    
+
     client = AXIS2_MALLOC(env->allocator, sizeof(savan_publishing_client_t));
-     
+
     if (!client)
-    { 
+    {
         AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
-        return NULL;        
+        return NULL;
     }
-    if(subscriber_list) 
-        client->subscriber_list = subscriber_list;
+
+    client->conf_ctx = conf_ctx;
+    client->svc = svc;
 
     return client;
 }
 
 /******************************************************************************/
 
+AXIS2_EXTERN void AXIS2_CALL
+savan_publishing_client_free(
+    savan_publishing_client_t *client, 
+    const axutil_env_t *env)
+{
+    AXIS2_FREE(env->allocator, client);
+}
+
 AXIS2_EXTERN axis2_status_t AXIS2_CALL
 savan_publishing_client_publish(
     savan_publishing_client_t *client,
@@ -70,7 +83,6 @@
     axis2_options_t *options = NULL;
     axis2_svc_client_t* svc_client = NULL;
     axutil_qname_t *op_qname = NULL;
-    axutil_property_t *property = NULL;
 
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
 
@@ -95,7 +107,8 @@
     op_qname = axutil_qname_create(env, "publish", NULL, NULL);
         
     /* Create service client */
-    svc_client = axis2_svc_client_create(env, repo_path); 
+    svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, repo_path,
+        client->conf_ctx, client->svc); 
     if (!svc_client)
     {
         AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to create a"
@@ -103,16 +116,14 @@
         return AXIS2_FAILURE;
     }
 
-    property = axutil_property_create_with_args(env, 0, 1,
-        axutil_hash_free_void_arg, client->subscriber_list);
-    axis2_options_set_property(options, env, SAVAN_SUBSCRIBER_LIST, property);
     /* Set service client options */
     axis2_svc_client_set_options(svc_client, env, options);
 
     axis2_svc_client_engage_module(svc_client, env, "savan");
 
     /* Send publishing message */
-    axis2_svc_client_send_robust_with_op_qname(svc_client, env, op_qname, payload); 
+    axis2_svc_client_send_robust_with_op_qname(svc_client, env, op_qname, 
+        payload); 
     
     return AXIS2_SUCCESS;
 }

Modified: webservices/savan/trunk/c/src/core/savan_sub_processor.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/core/savan_sub_processor.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/src/core/savan_sub_processor.c (original)
+++ webservices/savan/trunk/c/src/core/savan_sub_processor.c Thu Jun 21 04:35:44 2007
@@ -116,7 +116,6 @@
     /* Store sub id in msg ctx to be used by the msg receiver */
     id = savan_subscriber_get_id(subscriber, env);
     savan_sub_processor_set_sub_id_to_msg_ctx(env, msg_ctx, id);
-
     savan_util_add_subscriber(env, msg_ctx, subscriber);
     
     return AXIS2_SUCCESS;

Modified: webservices/savan/trunk/c/src/handlers/savan_out_handler.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/handlers/savan_out_handler.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/src/handlers/savan_out_handler.c (original)
+++ webservices/savan/trunk/c/src/handlers/savan_out_handler.c Thu Jun 21 04:35:44 2007
@@ -90,42 +90,58 @@
         " %d", msg_type);
     if (msg_type == SAVAN_MSG_TYPE_UNKNOWN)
     {
-        axutil_property_t *subs_list_property = NULL;
-        axutil_hash_t *subscriber_list = NULL;
+        axutil_hash_t *store = NULL;
         axutil_hash_index_t *hi = NULL;
-        void *val = NULL;
+        axis2_svc_t *svc = NULL;
+        axutil_param_t *param = NULL;
         /* Treat unknown msgs as msgs for publishing */
-        subs_list_property = axis2_msg_ctx_get_property(msg_ctx, env, 
-            SAVAN_SUBSCRIBER_LIST);
-        if(subs_list_property)
-            subscriber_list = axutil_property_get_value(subs_list_property, env);    
-        if (!subscriber_list)
+
+        svc =  axis2_msg_ctx_get_svc(msg_ctx, env);
+        if (!svc)
         {
+            printf("came201\n");
             AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
-                "Subscribe subscriber_list is null");
+                "Service not found");
             return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
         }
+
+        param = axis2_svc_get_param(svc, env, SAVAN_SUBSCRIBER_LIST);
+        if (!param)
+        {
+            printf("came202\n");
+            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
+                "Subscribe store not found");
+            return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
+        }
+
+        store = (axutil_hash_t*)axutil_param_get_value(param, env);
+        if (!store)
+        {
+            printf("came203\n");
+            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
+                "Subscribe store is null");
+            return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
+        }
+
+        /* Treat unknown msgs as msgs for publishing */
         
         /* Iterate the subscribe store and send the msg to each one */
 
-        for (hi = axutil_hash_first(subscriber_list, env); hi; hi = 
+        for (hi = axutil_hash_first(store, env); hi; hi = 
             axutil_hash_next(env, hi))
         {
-            axiom_soap_envelope_t *soap_env = NULL;
-            axiom_soap_body_t *soap_body = NULL;
-            axiom_node_t *payload = NULL;
+            printf("came204\n");
+            void *val = NULL;
             savan_subscriber_t * sub = NULL;
             axutil_hash_this(hi, NULL, NULL, &val);
             sub = (savan_subscriber_t *)val;
             if (sub)
             {
+                printf("came104\n");
                 axis2_char_t *id = savan_subscriber_get_id(sub, env);
                 AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][out handler] "
                     "Publishing to %s...", id);
-                soap_env = axis2_msg_ctx_get_soap_envelope(msg_ctx, env);
-                soap_body = axiom_soap_envelope_get_body(soap_env, env);
-                payload = axiom_soap_body_get_base_node(soap_body, env);
-                savan_subscriber_publish(sub, env, payload);
+                savan_subscriber_publish(sub, env, msg_ctx);
             }
 
             val = NULL;

Modified: webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c (original)
+++ webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c Thu Jun 21 04:35:44 2007
@@ -186,7 +186,7 @@
     axis2_conf_ctx_t *conf_ctx = NULL;
     axis2_conf_t *conf = NULL;
     axis2_module_desc_t *module_desc = NULL;
-    /*savan_subscriber_t *subscriber = NULL;*/
+    savan_subscriber_t *subscriber = NULL;
     
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][msg recv] "
         "handle sub request...");
@@ -248,8 +248,8 @@
     axiom_element_set_text(id_elem, env, id, id_node);
     
     /* Expires element. Get expiry time from subscriber and set */
-    /*subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
-    expires = savan_subscriber_get_expires(subscriber, env);*/
+    subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
+    expires = savan_subscriber_get_expires(subscriber, env);
     
     expires_elem = axiom_element_create(env, response_node, ELEM_NAME_EXPIRES, ns,
         &expires_node);

Modified: webservices/savan/trunk/c/src/subscribers/savan_subscriber.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/subscribers/savan_subscriber.c?view=diff&rev=549451&r1=549450&r2=549451
==============================================================================
--- webservices/savan/trunk/c/src/subscribers/savan_subscriber.c (original)
+++ webservices/savan/trunk/c/src/subscribers/savan_subscriber.c Thu Jun 21 04:35:44 2007
@@ -239,34 +239,51 @@
 savan_subscriber_publish(
     savan_subscriber_t *subscriber,
     const axutil_env_t *env,
-    const void *payload)
+    struct axis2_msg_ctx *msg_ctx)
 {
     axis2_svc_client_t *svc_client = NULL;
+    axis2_op_client_t *op_client = NULL;
+    axis2_conf_ctx_t *conf_ctx = NULL;
+    axis2_conf_t *conf = NULL;
+    axis2_svc_t *svc = NULL;
     axis2_char_t *path = NULL;
     axis2_options_t *options = NULL;
     axis2_status_t status = AXIS2_SUCCESS;
-    axis2_endpoint_ref_t *notify_to = NULL;
-    axiom_node_t *ret_node = NULL;
+    axutil_qname_t *op_qname = NULL;
 
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Start:savan_subscriber_publish");
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+        "[savan] Start:savan_subscriber_publish");
 	
+
     path = AXIS2_GETENV("AXIS2C_HOME");
+    conf_ctx =  axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+    conf =  axis2_conf_ctx_get_conf(conf_ctx, env);
+
+    /* Get anonymous service from conf. This will be null for the first time,
+     * but then it will be created when we create the svc_client */
+    svc = axis2_conf_get_svc(conf, env, AXIS2_ANON_SERVICE);
+
+    svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, path,
+        conf_ctx, svc);
 
-    svc_client = axis2_svc_client_create(env, path);
     /* Setup options */
     options = axis2_options_create(env);
-    notify_to = savan_subscriber_get_notify_to(subscriber, env);
-    if(notify_to)
-        printf("publishing to:%s\n", axis2_endpoint_ref_get_address(notify_to, env));
-    axis2_options_set_to(options, env, notify_to);
+    axis2_options_set_to(options, env, subscriber->notify_to);
+
     /* Set service client options */
     axis2_svc_client_set_options(svc_client, env, options);
 
     /* Engage addressing module */
-    axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
-    ret_node = axis2_svc_client_send_receive(svc_client, env, payload);
-    
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] End:savan_subscriber_publish");
+    /*axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);*/
+
+    op_qname = axutil_qname_create(env, AXIS2_ANON_OUT_ONLY_OP, NULL, NULL);
+
+    op_client = axis2_svc_client_create_op_client(svc_client, env,
+        op_qname);
+
+    axis2_op_client_add_msg_ctx(op_client, env, msg_ctx);
+    status = axis2_op_client_execute(op_client, env, AXIS2_TRUE);
+    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][subscribe] publish...");
     return status;
 }
 
@@ -301,7 +318,7 @@
 {
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     
-    subscriber->topic = topic;
+    subscriber->topic = axutil_strdup(env, topic);
 
     return AXIS2_SUCCESS;
 }