You are viewing a plain text version of this content. The canonical link for it is here.
Posted to savan-dev@ws.apache.org by da...@apache.org on 2007/06/20 09:25:33 UTC
svn commit: r548965 - in /webservices/savan/trunk/c: include/
samples/server/publisher/ src/client/ src/core/ src/handlers/
src/msgreceivers/ src/subscribers/ src/util/
Author: damitha
Date: Wed Jun 20 00:25:32 2007
New Revision: 548965
URL: http://svn.apache.org/viewvc?view=rev&rev=548965
Log:
More work on savan to improve subscription manager
Modified:
webservices/savan/trunk/c/include/savan_constants.h
webservices/savan/trunk/c/include/savan_publishing_client.h
webservices/savan/trunk/c/include/savan_subscriber.h
webservices/savan/trunk/c/include/savan_util.h
webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
webservices/savan/trunk/c/src/client/savan_publishing_client.c
webservices/savan/trunk/c/src/core/savan_sub_processor.c
webservices/savan/trunk/c/src/handlers/savan_out_handler.c
webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
webservices/savan/trunk/c/src/subscribers/savan_subscriber.c
webservices/savan/trunk/c/src/util/savan_util.c
Modified: webservices/savan/trunk/c/include/savan_constants.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_constants.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_constants.h (original)
+++ webservices/savan/trunk/c/include/savan_constants.h Wed Jun 20 00:25:32 2007
@@ -52,9 +52,13 @@
#define SAVAN_ACTIONS_GET_STATUS "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus"
#define SAVAN_ACTIONS_GET_STATUS_RESPONSE "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse"
-#define SUBSCRIBER_STORE "SubscriberStore"
#define EVENTING_NAMESPACE "http://schemas.xmlsoap.org/ws/2004/08/eventing"
#define EVENTING_NS_PREFIX "wse"
+#define SAVAN_NAMESPACE "http://ws.apache.org/savan"
+#define SAVAN_NS_PREFIX "savan"
+#define ELEM_NAME_SUBSCRIBERS "Subscribers"
+#define ELEM_NAME_ADD_SUBSCRIBER "AddSubscriber"
+#define ELEM_NAME_TOPIC "Topic"
#define DEFAULT_DELIVERY_MODE "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push"
/* Eventing element names */
@@ -86,6 +90,7 @@
#define SAVAN_OP_KEY_FILTER "savan_op_key_filter"
#define SAVAN_KEY_SUB_ID "savan_key_subscriber_id"
+#define SAVAN_SUBSCRIBER_LIST "savan_subs_list"
/** @} */
#ifdef __cplusplus
Modified: webservices/savan/trunk/c/include/savan_publishing_client.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_publishing_client.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_publishing_client.h (original)
+++ webservices/savan/trunk/c/include/savan_publishing_client.h Wed Jun 20 00:25:32 2007
@@ -45,6 +45,7 @@
* Publish the given message to all subscribed clients
* @param client the publishing client object
* @param env pointer to environment struct
+ * @param payload
* @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
*/
AXIS2_EXTERN axis2_status_t AXIS2_CALL
@@ -56,8 +57,7 @@
AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
savan_publishing_client_create(
const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_svc_t *svc);
+ axutil_hash_t *subscriber_list);
/** @} */
#ifdef __cplusplus
Modified: webservices/savan/trunk/c/include/savan_subscriber.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_subscriber.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_subscriber.h (original)
+++ webservices/savan/trunk/c/include/savan_subscriber.h Wed Jun 20 00:25:32 2007
@@ -73,7 +73,18 @@
savan_subscriber_t *subscriber,
const axutil_env_t *env,
axis2_endpoint_ref_t *end_to);
-
+
+ /**
+ * Get EndTo end point.
+ * @param subscriber pointer to subscriber
+ * @param env pointer to environment struct
+ * @return end_to
+ */
+ axis2_endpoint_ref_t *AXIS2_CALL
+ savan_subscriber_get_end_to(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env);
+
/**
* Set NotifyTo end point.
* @param subscriber pointer to subscriber
@@ -87,6 +98,17 @@
axis2_endpoint_ref_t *notify_to);
/**
+ * Get NotifyTo end point.
+ * @param subscriber pointer to subscriber
+ * @param env pointer to environment struct
+ * @return notify_to
+ */
+ axis2_endpoint_ref_t *AXIS2_CALL
+ savan_subscriber_get_notify_to(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env);
+
+ /**
* Set delivery mode.
* @param subscriber pointer to subscriber
* @param env pointer to environment struct
@@ -114,7 +136,7 @@
* Get expires.
* @param subscriber pointer to subscriber
* @param env pointer to environment struct
- * @return expire time on success, else NULL
+ * @return expire date and time as string
*/
axis2_char_t * AXIS2_CALL
savan_subscriber_get_expires(
@@ -134,17 +156,28 @@
const axis2_char_t *filter);
/**
+ * Get filter.
+ * @param subscriber pointer to subscriber
+ * @param env pointer to environment struct
+ * @return filter the filter string
+ */
+ axis2_char_t *AXIS2_CALL
+ savan_subscriber_get_filter(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env);
+
+ /**
* Publishes the given msg to the client.
* @param subscriber pointer to subscriber
* @param env pointer to environment struct
- * @param msg_ctx the msg to be published
+ * @param payload the content to be published
* @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
*/
axis2_status_t AXIS2_CALL
savan_subscriber_publish(
savan_subscriber_t *subscriber,
const axutil_env_t *env,
- struct axis2_msg_ctx *msg_ctx);
+ const void *payload);
/**
* Set whether the subscription is renewed or not.
@@ -180,6 +213,17 @@
savan_subscriber_create(
const axutil_env_t *env);
+ axis2_status_t AXIS2_CALL
+ savan_subscriber_set_topic(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env,
+ axis2_char_t *topic);
+
+ axis2_char_t *AXIS2_CALL
+ savan_subscriber_get_topic(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env);
+
/** @} */
#ifdef __cplusplus
}
Modified: webservices/savan/trunk/c/include/savan_util.h
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/include/savan_util.h?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/include/savan_util.h (original)
+++ webservices/savan/trunk/c/include/savan_util.h Wed Jun 20 00:25:32 2007
@@ -82,6 +82,19 @@
axis2_msg_ctx_t *msg_ctx);
/**
+ * Add the subscriber to subscription manager services' store
+ * @param env pointer to environment struct
+ * @param msg_ctx pointer to message context
+ * @param subscriber
+ * @return the store on success, else NULL
+ */
+ axis2_status_t AXIS2_CALL
+ savan_util_add_subscriber(
+ const axutil_env_t *env,
+ axis2_msg_ctx_t *msg_ctx,
+ savan_subscriber_t *subscriber);
+
+ /**
* Calculate and return an expiry time for the subscription
* @param env pointer to environment struct
* @return the expiry time on success, else NULL
@@ -101,6 +114,17 @@
savan_util_get_renewed_expiry_time(
const axutil_env_t *env,
axis2_char_t *expiry);
+
+ /**
+ * Create storage hash and set as a service parameter.
+ * @param env pointer to environment struct
+ * @param svc subscription service
+ * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
+ */
+ AXIS2_EXTERN axis2_status_t AXIS2_CALL
+ savan_util_set_sub_store(
+ axis2_svc_t *svc,
+ const axutil_env_t *env);
/** @} */
#ifdef __cplusplus
Modified: webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c (original)
+++ webservices/savan/trunk/c/samples/server/publisher/publisher_skeleton.c Wed Jun 20 00:25:32 2007
@@ -40,6 +40,7 @@
axutil_env_t *env;
axis2_svc_t *svc;
axis2_conf_ctx_t *conf_ctx;
+ axutil_hash_t *subscriber_list;
}publisher_data_t;
int AXIS2_CALL
@@ -54,16 +55,23 @@
* This method invokes the right service method
*/
axiom_node_t* AXIS2_CALL
-publisher_invoke(axis2_svc_skeleton_t *svc_skeleton,
- const axutil_env_t *env,
- axiom_node_t *node,
- axis2_msg_ctx_t *msg_ctx);
-
+publisher_invoke(
+ axis2_svc_skeleton_t *svc_skeleton,
+ const axutil_env_t *env,
+ axiom_node_t *node,
+ axis2_msg_ctx_t *msg_ctx);
+
int AXIS2_CALL
publisher_init(axis2_svc_skeleton_t *svc_skeleton,
const axutil_env_t *env);
+int AXIS2_CALL
+publisher_init_with_conf(
+ axis2_svc_skeleton_t *svc_skeleton,
+ const axutil_env_t *env,
+ axis2_conf_t *conf);
+
axiom_node_t* AXIS2_CALL
publisher_on_fault(axis2_svc_skeleton_t *svc_skeli,
const axutil_env_t *env, axiom_node_t *node);
@@ -77,7 +85,8 @@
publisher_init,
publisher_invoke,
publisher_on_fault,
- publisher_free
+ publisher_free,
+ publisher_init_with_conf
};
/*Create function */
@@ -126,14 +135,17 @@
* This method invokes the right service method
*/
axiom_node_t* AXIS2_CALL
-publisher_invoke(axis2_svc_skeleton_t *svc_skeleton,
- const axutil_env_t *env,
- axiom_node_t *node,
- axis2_msg_ctx_t *msg_ctx)
+publisher_invoke(
+ axis2_svc_skeleton_t *svc_skeleton,
+ const axutil_env_t *env,
+ axiom_node_t *node,
+ axis2_msg_ctx_t *msg_ctx)
{
axutil_thread_t *worker_thread = NULL;
publisher_data_t *data = NULL;
+ axutil_param_t *param = NULL;
+ axutil_hash_t *subs_list = NULL;
printf("publisher invoke called.\n");
@@ -145,6 +157,9 @@
data->env = (axutil_env_t*)env;
data->svc = axis2_msg_ctx_get_svc(msg_ctx, env);
data->conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+ param = axis2_svc_get_param(data->svc, env, SAVAN_SUBSCRIBER_LIST);
+ subs_list = axutil_param_get_value(param, env);
+ data->subscriber_list = subs_list;
worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
publisher_worker_func, (void*)data);
@@ -212,11 +227,13 @@
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_svc_t *svc = NULL;
savan_publishing_client_t *pub_client = NULL;
+ axutil_hash_t *subs_list = NULL;
publisher_data_t *mydata = (publisher_data_t*)data;
main_env = mydata->env;
conf_ctx = mydata->conf_ctx;
svc = mydata->svc;
+ subs_list = mydata->subscriber_list;
env = axutil_init_thread_env(main_env);
@@ -229,7 +246,7 @@
axiom_element_set_text(test_elem, env, "test data", test_node);
- pub_client = savan_publishing_client_create(env, conf_ctx, svc);
+ pub_client = savan_publishing_client_create(env, subs_list);
while(1)
{
Modified: webservices/savan/trunk/c/src/client/savan_publishing_client.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/client/savan_publishing_client.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/client/savan_publishing_client.c (original)
+++ webservices/savan/trunk/c/src/client/savan_publishing_client.c Wed Jun 20 00:25:32 2007
@@ -18,6 +18,7 @@
#include <axiom_element.h>
#include <axiom_soap_body.h>
#include <axis2_options.h>
+#include <axutil_array_list.h>
#include <platforms/axutil_platform_auto_sense.h>
#include <savan_publishing_client.h>
@@ -25,8 +26,7 @@
struct savan_publishing_client_t
{
- axis2_conf_ctx_t *conf_ctx;
- axis2_svc_t *svc;
+ axutil_hash_t *subscriber_list;
};
/******************************************************************************/
@@ -37,8 +37,7 @@
AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
savan_publishing_client_create(
const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_svc_t *svc)
+ axutil_hash_t *subscriber_list)
{
savan_publishing_client_t *client = NULL;
@@ -51,9 +50,8 @@
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
-
- client->conf_ctx = conf_ctx;
- client->svc = svc;
+ if(subscriber_list)
+ client->subscriber_list = subscriber_list;
return client;
}
@@ -72,6 +70,7 @@
axis2_options_t *options = NULL;
axis2_svc_client_t* svc_client = NULL;
axutil_qname_t *op_qname = NULL;
+ axutil_property_t *property = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
@@ -95,9 +94,7 @@
op_qname = axutil_qname_create(env, "publish", NULL, NULL);
/* Create service client */
- svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, repo_path,
- client->conf_ctx, client->svc);
-
+ svc_client = axis2_svc_client_create(env, repo_path);
if (!svc_client)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to create a"
@@ -105,6 +102,9 @@
return AXIS2_FAILURE;
}
+ property = axutil_property_create_with_args(env, 0, 1,
+ axutil_hash_free_void_arg, client->subscriber_list);
+ axis2_options_set_property(options, env, SAVAN_SUBSCRIBER_LIST, property);
/* Set service client options */
axis2_svc_client_set_options(svc_client, env, options);
@@ -115,3 +115,4 @@
return AXIS2_SUCCESS;
}
+
Modified: webservices/savan/trunk/c/src/core/savan_sub_processor.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/core/savan_sub_processor.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/core/savan_sub_processor.c (original)
+++ webservices/savan/trunk/c/src/core/savan_sub_processor.c Wed Jun 20 00:25:32 2007
@@ -34,12 +34,7 @@
};
/* Function Prototypes ********************************************************/
-
-axis2_status_t AXIS2_CALL
-savan_sub_processor_set_sub_store(
- axis2_svc_t *svc,
- const axutil_env_t *env);
-
+
savan_subscriber_t * AXIS2_CALL
savan_sub_processor_create_subscriber_from_msg(
const axutil_env_t *env,
@@ -93,17 +88,9 @@
const axutil_env_t *env,
axis2_msg_ctx_t *msg_ctx)
{
- axis2_svc_t *subs_svc = NULL;
- axutil_param_t *param = NULL;
- axutil_hash_t *store = NULL;
savan_subscriber_t *subscriber = NULL;
axis2_char_t *expires = NULL;
axis2_char_t *id = NULL;
- axutil_qname_t *qname = NULL;
- axis2_module_desc_t *module_desc = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_conf_t *conf = NULL;
- axis2_char_t *subs_svc_name = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
@@ -120,54 +107,17 @@
AXIS2_FAILURE);
return AXIS2_FAILURE;
}
-
- /* Set this subscriber inside a subscriber store maintained in the svc */
-
- conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
- conf = axis2_conf_ctx_get_conf(conf_ctx, env);
- qname = axutil_qname_create(env, "savan", NULL, NULL);
- module_desc = axis2_conf_get_module(conf, env, qname);
- axutil_qname_free(qname, env);
- param = axis2_module_desc_get_param(module_desc, env, "SubscriptionMgrName");
- if(param)
- {
- subs_svc_name = axutil_param_get_value(param, env);
- subs_svc = axis2_conf_get_svc(conf, env, subs_svc_name);
- }
- if(!subs_svc)
- {
- subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
- }
- if (!subs_svc)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract "
- "the service");
- return AXIS2_FAILURE;
- }
-
- param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
- if (!param)
- {
- /* Store not found. Create and set it as a param */
- savan_sub_processor_set_sub_store(subs_svc, env);
- param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
- }
-
- store = (axutil_hash_t*)axutil_param_get_value(param, env);
-
/* Set the expiry time on the subscription */
/* TODO : For now we are ignoring the Expiry sent by the client. Add support
* to consider this when setting the expiry time */
expires = savan_util_get_expiry_time(env);
savan_subscriber_set_expires(subscriber, env, expires);
- /* Store the created subscriber in the svc */
- axutil_hash_set(store, savan_subscriber_get_id(subscriber, env),
- AXIS2_HASH_KEY_STRING, subscriber);
-
/* Store sub id in msg ctx to be used by the msg receiver */
id = savan_subscriber_get_id(subscriber, env);
savan_sub_processor_set_sub_id_to_msg_ctx(env, msg_ctx, id);
+
+ savan_util_add_subscriber(env, msg_ctx, subscriber);
return AXIS2_SUCCESS;
}
@@ -289,40 +239,6 @@
/******************************************************************************/
-axis2_status_t AXIS2_CALL
-savan_sub_processor_set_sub_store(
- axis2_svc_t *svc,
- const axutil_env_t *env)
-{
- axutil_hash_t *store = NULL;
- axutil_param_t *param = NULL;
-
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
-
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
- "set sub store...");
-
- /* Create a hash map */
- store = axutil_hash_make(env);
- if (!store)
- {
- /* TODO : error reporting */
- return AXIS2_FAILURE;
- }
-
- /* Add the hash map as a parameter to the given service */
- param = axutil_param_create(env, SUBSCRIBER_STORE, (void*)store);
- if (!param)
- {
- /* TODO : error reporting */
- return AXIS2_FAILURE;
- }
-
- axis2_svc_add_param(svc, env, param);
-
- return AXIS2_SUCCESS;
-}
-
/******************************************************************************/
savan_subscriber_t * AXIS2_CALL
@@ -356,9 +272,11 @@
axis2_char_t *notify = NULL;
axis2_char_t *expires = NULL;
axis2_char_t *filter = NULL;
+ axis2_char_t *topic = NULL;
axis2_endpoint_ref_t *endto_epr = NULL;
axis2_endpoint_ref_t *notify_epr = NULL;
+ axis2_endpoint_ref_t *topic_epr = NULL;
AXIS2_ENV_CHECK(env, NULL);
@@ -460,6 +378,9 @@
savan_subscriber_set_filter(subscriber, env, filter);
+ topic_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+ topic = (axis2_char_t *)axis2_endpoint_ref_get_address(topic_epr, env);
+ savan_subscriber_set_topic(subscriber, env, topic);
return subscriber;
}
Modified: webservices/savan/trunk/c/src/handlers/savan_out_handler.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/handlers/savan_out_handler.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/handlers/savan_out_handler.c (original)
+++ webservices/savan/trunk/c/src/handlers/savan_out_handler.c Wed Jun 20 00:25:32 2007
@@ -23,6 +23,7 @@
#include <axiom_soap_const.h>
#include <axiom_soap_envelope.h>
#include <axiom_soap_header.h>
+#include <axiom_soap_body.h>
#include <axiom_soap_header_block.h>
#include <axis2_op.h>
#include <axis2_msg_ctx.h>
@@ -79,60 +80,40 @@
struct axis2_msg_ctx *msg_ctx)
{
savan_message_types_t msg_type = SAVAN_MSG_TYPE_UNKNOWN;
- /*axis2_svc_t *svc = NULL;*/
- axutil_param_t *param = NULL;
- axutil_hash_t *store = NULL;
- const axis2_svc_t *svc = NULL;
- const axis2_char_t *svc_name = NULL;
- axutil_hash_index_t *hi = NULL;
- void *val = NULL;
AXIS2_ENV_CHECK( env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
- svc = axis2_msg_ctx_get_svc(msg_ctx, env);
- if (svc)
- svc_name = axis2_svc_get_name (svc, env);
-
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[%s][savan][out handler] invoke...",
- svc_name);
-
/* Determine the eventing msg type */
msg_type = savan_util_get_message_type(msg_ctx, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[%s][savan][out handler] msg type:"
- " %d", svc_name, msg_type);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][out handler] msg type:"
+ " %d", msg_type);
if (msg_type == SAVAN_MSG_TYPE_UNKNOWN)
{
+ axutil_property_t *subs_list_property = NULL;
+ axutil_hash_t *subscriber_list = NULL;
+ axutil_hash_index_t *hi = NULL;
+ void *val = NULL;
/* Treat unknown msgs as msgs for publishing */
-
- svc = axis2_msg_ctx_get_svc(msg_ctx, env);
- if (!svc)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
- "Service not found");
- return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
- }
-
- param = axis2_svc_get_param(svc, env, SUBSCRIBER_STORE);
- if (!param)
+ subs_list_property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SAVAN_SUBSCRIBER_LIST);
+ if(subs_list_property)
+ subscriber_list = axutil_property_get_value(subs_list_property, env);
+ if (!subscriber_list)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
- "Subscribe store not found");
- return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
- }
-
- store = (axutil_hash_t*)axutil_param_get_value(param, env);
- if (!store)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][out handler] "
- "Subscribe store is null");
+ "Subscribe subscriber_list is null");
return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
}
/* Iterate the subscribe store and send the msg to each one */
-
- for (hi = axutil_hash_first(store, env); hi; hi = axutil_hash_next(env, hi))
+
+ for (hi = axutil_hash_first(subscriber_list, env); hi; hi =
+ axutil_hash_next(env, hi))
{
+ axiom_soap_envelope_t *soap_env = NULL;
+ axiom_soap_body_t *soap_body = NULL;
+ axiom_node_t *payload = NULL;
savan_subscriber_t * sub = NULL;
axutil_hash_this(hi, NULL, NULL, &val);
sub = (savan_subscriber_t *)val;
@@ -141,14 +122,19 @@
axis2_char_t *id = savan_subscriber_get_id(sub, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][out handler] "
"Publishing to %s...", id);
- savan_subscriber_publish(sub, env, msg_ctx);
+ soap_env = axis2_msg_ctx_get_soap_envelope(msg_ctx, env);
+ soap_body = axiom_soap_envelope_get_body(soap_env, env);
+ payload = axiom_soap_body_get_base_node(soap_body, env);
+ savan_subscriber_publish(sub, env, payload);
}
-
+
val = NULL;
}
- axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
+
+ axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
}
return AXIS2_SUCCESS;
}
+
Modified: webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c (original)
+++ webservices/savan/trunk/c/src/msgreceivers/savan_msg_recv.c Wed Jun 20 00:25:32 2007
@@ -186,7 +186,7 @@
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_conf_t *conf = NULL;
axis2_module_desc_t *module_desc = NULL;
- savan_subscriber_t *subscriber = NULL;
+ /*savan_subscriber_t *subscriber = NULL;*/
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][msg recv] "
"handle sub request...");
@@ -248,8 +248,8 @@
axiom_element_set_text(id_elem, env, id, id_node);
/* Expires element. Get expiry time from subscriber and set */
- subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
- expires = savan_subscriber_get_expires(subscriber, env);
+ /*subscriber = savan_util_get_subscriber_from_msg(env, msg_ctx, id);
+ expires = savan_subscriber_get_expires(subscriber, env);*/
expires_elem = axiom_element_create(env, response_node, ELEM_NAME_EXPIRES, ns,
&expires_node);
Modified: webservices/savan/trunk/c/src/subscribers/savan_subscriber.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/subscribers/savan_subscriber.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/subscribers/savan_subscriber.c (original)
+++ webservices/savan/trunk/c/src/subscribers/savan_subscriber.c Wed Jun 20 00:25:32 2007
@@ -30,6 +30,7 @@
axis2_char_t *delivery_mode;
axis2_char_t *expires;
axis2_char_t *filter;
+ axis2_char_t *topic;
axis2_bool_t renewed;
};
@@ -112,6 +113,14 @@
return AXIS2_SUCCESS;
}
+axis2_endpoint_ref_t *AXIS2_CALL
+savan_subscriber_get_end_to(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env)
+{
+ return subscriber->end_to;
+}
+
/******************************************************************************/
axis2_status_t AXIS2_CALL
@@ -127,6 +136,14 @@
return AXIS2_SUCCESS;
}
+axis2_endpoint_ref_t *AXIS2_CALL
+savan_subscriber_get_notify_to(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env)
+{
+ return subscriber->notify_to;
+}
+
/******************************************************************************/
axis2_status_t AXIS2_CALL
@@ -208,54 +225,45 @@
return AXIS2_SUCCESS;
}
+axis2_char_t *AXIS2_CALL
+savan_subscriber_get_filter(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env)
+{
+ return subscriber->filter;
+}
+
/******************************************************************************/
axis2_status_t AXIS2_CALL
savan_subscriber_publish(
savan_subscriber_t *subscriber,
const axutil_env_t *env,
- struct axis2_msg_ctx *msg_ctx)
+ const void *payload)
{
axis2_svc_client_t *svc_client = NULL;
- axis2_op_client_t *op_client = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_conf_t *conf = NULL;
- axis2_svc_t *svc = NULL;
axis2_char_t *path = NULL;
axis2_options_t *options = NULL;
axis2_status_t status = AXIS2_SUCCESS;
- axutil_qname_t *op_qname = NULL;
+ axis2_endpoint_ref_t *notify_to = NULL;
+ axiom_node_t *ret_node = NULL;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][subscribe] publish...");
path = AXIS2_GETENV("AXIS2C_HOME");
- conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
- conf = axis2_conf_ctx_get_conf(conf_ctx, env);
- /* Get anonymous service from conf. This will be null for the first time,
- * but then it will be created when we create the svc_client */
- svc = axis2_conf_get_svc(conf, env, AXIS2_ANON_SERVICE);
-
- svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, path,
- conf_ctx, svc);
+ svc_client = axis2_svc_client_create(env, path);
/* Setup options */
options = axis2_options_create(env);
- axis2_options_set_to(options, env, subscriber->notify_to);
-
+ notify_to = savan_subscriber_get_notify_to(subscriber, env);
+ axis2_options_set_to(options, env, notify_to);
/* Set service client options */
axis2_svc_client_set_options(svc_client, env, options);
/* Engage addressing module */
- /*axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);*/
-
- op_qname = axutil_qname_create(env, AXIS2_ANON_OUT_ONLY_OP, NULL, NULL);
-
- op_client = axis2_svc_client_create_op_client(svc_client, env,
- op_qname);
-
- axis2_op_client_add_msg_ctx(op_client, env, msg_ctx);
- status = axis2_op_client_execute(op_client, env, AXIS2_TRUE);
+ axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+ ret_node = axis2_svc_client_send_receive(svc_client, env, payload);
return status;
}
@@ -282,3 +290,26 @@
{
return subscriber->renewed;
}
+
+axis2_status_t AXIS2_CALL
+savan_subscriber_set_topic(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env,
+ axis2_char_t *topic)
+{
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ subscriber->topic = topic;
+
+ return AXIS2_SUCCESS;
+}
+
+axis2_char_t *AXIS2_CALL
+savan_subscriber_get_topic(
+ savan_subscriber_t *subscriber,
+ const axutil_env_t *env)
+{
+ return subscriber->topic;
+}
+
+
Modified: webservices/savan/trunk/c/src/util/savan_util.c
URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/util/savan_util.c?view=diff&rev=548965&r1=548964&r2=548965
==============================================================================
--- webservices/savan/trunk/c/src/util/savan_util.c (original)
+++ webservices/savan/trunk/c/src/util/savan_util.c Wed Jun 20 00:25:32 2007
@@ -15,6 +15,10 @@
*/
#include <axis2_msg_info_headers.h>
+#include <axis2_options.h>
+#include <axis2_svc_client.h>
+#include <axis2_endpoint_ref.h>
+#include <platforms/axutil_platform_auto_sense.h>
#include <axiom_soap.h>
#include <savan_util.h>
@@ -22,6 +26,33 @@
/******************************************************************************/
+static axis2_status_t
+add_subscriber_to_remote_subs_mgr(
+ const axutil_env_t *env,
+ savan_subscriber_t *subscriber,
+ axis2_char_t *subs_mgr_url);
+
+static axutil_hash_t *
+get_subscriber_list_from_remote_subs_mgr(
+ const axutil_env_t *env,
+ axis2_char_t *topic,
+ axis2_char_t *subs_mgr_url);
+
+static axiom_node_t *
+build_add_subscriber_om_payload(
+ const axutil_env_t *env,
+ savan_subscriber_t *subscriber);
+
+static axiom_node_t *
+build_subscribers_request_om_payload(
+ const axutil_env_t *env,
+ axis2_char_t *topic);
+
+static axutil_hash_t *
+process_subscriber_list_node(
+ const axutil_env_t *env,
+ axiom_node_t *subs_list_node);
+
savan_message_types_t AXIS2_CALL
savan_util_get_message_type(
axis2_msg_ctx_t *msg_ctx,
@@ -165,51 +196,490 @@
axis2_svc_t *subs_svc = NULL;
axutil_param_t *param = NULL;
axutil_hash_t *store = NULL;
- axutil_qname_t *qname = NULL;
- axis2_module_desc_t *module_desc = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_conf_t *conf = NULL;
axis2_char_t *subs_svc_name = NULL;
+ axis2_char_t *topic = NULL;
+ axis2_endpoint_ref_t *topic_epr = NULL;
AXIS2_ENV_CHECK(env, NULL);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][util] "
"get subscriber store...");
- conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
- conf = axis2_conf_ctx_get_conf(conf_ctx, env);
- qname = axutil_qname_create(env, "savan", NULL, NULL);
- module_desc = axis2_conf_get_module(conf, env, qname);
- axutil_qname_free(qname, env);
- param = axis2_module_desc_get_param(module_desc, env, "SubscriptionMgrName");
+ subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
+ if (!subs_svc)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
+ "service");
+ return NULL;
+ }
+ param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrName");
if(param)
{
subs_svc_name = axutil_param_get_value(param, env);
- subs_svc = axis2_conf_get_svc(conf, env, subs_svc_name);
}
- if(!subs_svc)
+ if(subs_svc_name)
{
- subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
+ axis2_char_t *subs_mgr_url = NULL;
+ param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrURL");
+ subs_mgr_url = axutil_param_get_value(param, env);
+ topic_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+ topic = (axis2_char_t *) axis2_endpoint_ref_get_address(topic_epr, env);
+ store = get_subscriber_list_from_remote_subs_mgr(env, topic, subs_mgr_url);
+ }
+ else
+ {
+ param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+ if (!param)
+ {
+ /* Store not found. Create and set it as a param */
+ savan_util_set_sub_store(subs_svc, env);
+ param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+ }
+
+ store = (axutil_hash_t*)axutil_param_get_value(param, env);
}
+ return store;
+}
+
+axis2_status_t AXIS2_CALL
+savan_util_add_subscriber(
+ const axutil_env_t *env,
+ axis2_msg_ctx_t *msg_ctx,
+ savan_subscriber_t *subscriber)
+{
+ axis2_svc_t *subs_svc = NULL;
+ axutil_param_t *param = NULL;
+ axutil_hash_t *store = NULL;
+ axis2_char_t *subs_svc_name = NULL;
+
+ subs_svc = axis2_msg_ctx_get_svc(msg_ctx, env);
if (!subs_svc)
{
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
- "service");
- return NULL;
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract "
+ "the service");
+ return AXIS2_FAILURE;
+ }
+ param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrName");
+ printf("came10\n");
+ if(param)
+ {
+ subs_svc_name = axutil_param_get_value(param, env);
+ }
+ if(subs_svc_name)
+ {
+ axis2_char_t *subs_mgr_url = NULL;
+ param = axis2_svc_get_param(subs_svc, env, "SubscriptionMgrURL");
+ subs_mgr_url = axutil_param_get_value(param, env);
+ add_subscriber_to_remote_subs_mgr(env, subscriber, subs_mgr_url);
}
+ else
+ {
+ param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+ if (!param)
+ {
+ /* Store not found. Create and set it as a param */
+ savan_util_set_sub_store(subs_svc, env);
+ param = axis2_svc_get_param(subs_svc, env, SAVAN_SUBSCRIBER_LIST);
+ }
+
+ store = (axutil_hash_t*)axutil_param_get_value(param, env);
+ /* Store the created subscriber in the svc */
+ axutil_hash_set(store, savan_subscriber_get_id(subscriber, env),
+ AXIS2_HASH_KEY_STRING, subscriber);
+ }
+
+ return AXIS2_SUCCESS;
+}
- param = axis2_svc_get_param(subs_svc, env, SUBSCRIBER_STORE);
+axis2_status_t AXIS2_CALL
+savan_util_set_sub_store(
+ axis2_svc_t *svc,
+ const axutil_env_t *env)
+{
+ axutil_hash_t *store = NULL;
+ axutil_param_t *param = NULL;
+
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+ "Start:set_sub_store");
+
+ /* Create a hash map */
+ store = axutil_hash_make(env);
+ if (!store)
+ {
+ /* TODO : error reporting */
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+ "Could not create subscriber store");
+ return AXIS2_FAILURE;
+ }
+
+ /* Add the hash map as a parameter to the given service */
+ param = axutil_param_create(env, SAVAN_SUBSCRIBER_LIST, (void*)store);
if (!param)
{
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the "
- "subscriber store param");
+ /* TODO : error reporting */
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+ "Could not create subscriber store param");
+ return AXIS2_FAILURE;
+ }
+
+ axis2_svc_add_param(svc, env, param);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan][sub processor] "
+ "End:set_sub_store");
+
+ return AXIS2_SUCCESS;
+}
+
+static axis2_status_t
+add_subscriber_to_remote_subs_mgr(
+ const axutil_env_t *env,
+ savan_subscriber_t *subscriber,
+ axis2_char_t *subs_mgr_url)
+{
+ const axis2_char_t *address = NULL;
+ axis2_endpoint_ref_t* endpoint_ref = NULL;
+ axis2_options_t *options = NULL;
+ const axis2_char_t *client_home = NULL;
+ axis2_svc_client_t* svc_client = NULL;
+ axiom_node_t *payload = NULL;
+
+ /* Set up the environment */
+ env = axutil_env_create_all("savan.log", AXIS2_LOG_LEVEL_TRACE);
+
+ /* Set end point reference of echo service */
+ address = subs_mgr_url;
+ printf("[savan] Using endpoint : %s\n", address);
+
+ /* Create EPR with given address */
+ endpoint_ref = axis2_endpoint_ref_create(env, address);
+
+ /* Setup options */
+ options = axis2_options_create(env);
+ axis2_options_set_to(options, env, endpoint_ref);
+ axis2_options_set_action(options, env,
+ "http://ws.apache.org/axis2/c/subscription/add_subscriber");
+
+ /* Set up deploy folder. It is from the deploy folder, the configuration is
+ * picked up using the axis2.xml file.
+ * In this sample client_home points to the Axis2/C default deploy folder.
+ * The client_home can be different from this folder on your system. For
+ * example, you may have a different folder (say, my_client_folder) with its
+ * own axis2.xml file. my_client_folder/modules will have the modules that
+ * the client uses
+ */
+ client_home = (const axis2_char_t *) AXIS2_GETENV("AXIS2C_HOME");
+ if (!client_home)
+ client_home = "../../deploy";
+
+ /* Create service client */
+ svc_client = axis2_svc_client_create(env, client_home);
+ if (!svc_client)
+ {
+ printf("Error creating service client\n");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Stub invoke FAILED: Error code:"
+ " %d :: %s", env->error->error_number,
+ AXIS2_ERROR_GET_MESSAGE(env->error));
+ return -1;
+ }
+ axis2_options_set_soap_version(options, env, AXIOM_SOAP11);
+ /* Set service client options */
+ axis2_svc_client_set_options(svc_client, env, options);
+
+ axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+ payload = build_add_subscriber_om_payload(env, subscriber);
+ /* Send request */
+ axis2_svc_client_send_robust(svc_client, env, payload);
+
+ return AXIS2_SUCCESS;
+}
+
+static axutil_hash_t *
+get_subscriber_list_from_remote_subs_mgr(
+ const axutil_env_t *env,
+ axis2_char_t *topic,
+ axis2_char_t *subs_mgr_url)
+{
+ axis2_endpoint_ref_t* endpoint_ref = NULL;
+ axis2_options_t *options = NULL;
+ const axis2_char_t *client_home = NULL;
+ axis2_svc_client_t* svc_client = NULL;
+ axiom_node_t *payload = NULL;
+ axiom_node_t *ret_node = NULL;
+ axutil_hash_t *subscriber_list = NULL;
+
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[savan] Start:get_subscriber_list_from_remote_subs_mgr");
+ options = axis2_options_create(env);
+ axis2_options_set_action(options, env,
+ "http://ws.apache.org/axis2/c/subscription/get_subscriber_list");
+
+ client_home = AXIS2_GETENV("AXIS2C_HOME");
+ if (!client_home)
+ client_home = "../../deploy";
+
+ svc_client = axis2_svc_client_create(env, client_home);
+ if (!svc_client)
+ {
+ printf("Error creating service client\n");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[ML] Stub invoke FAILED: Error code:"
+ " %d :: %s", env->error->error_number,
+ AXIS2_ERROR_GET_MESSAGE(env->error));
return NULL;
}
+ endpoint_ref = axis2_endpoint_ref_create(env, subs_mgr_url);
+ axis2_options_set_to(options, env, endpoint_ref);
+ printf("[savan] Using endpoint : %s\n", subs_mgr_url);
+ axis2_options_set_soap_version(options, env, AXIOM_SOAP11);
+ axis2_svc_client_set_options(svc_client, env, options);
+
+ axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
+ payload = build_subscribers_request_om_payload(env, topic);
+ ret_node = axis2_svc_client_send_receive(svc_client, env, payload);
+ if (ret_node)
+ {
+ subscriber_list = process_subscriber_list_node(env, ret_node);
+ }
+ else
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[ML] Stub invoke FAILED: Error code:"
+ " %d :: %s", env->error->error_number,
+ AXIS2_ERROR_GET_MESSAGE(env->error));
+ printf("Retrieving subscriber list FAILED!\n");
+ }
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[savan] End:get_subscriber_list_from_remote_subs_mgr");
+ return subscriber_list;
+}
- store = (axutil_hash_t*)axutil_param_get_value(param, env);
+static axiom_node_t *
+build_subscribers_request_om_payload(
+ const axutil_env_t *env,
+ axis2_char_t *topic)
+{
+ axiom_node_t *om_node = NULL;
+ axiom_element_t* om_ele = NULL;
+ axiom_node_t* topic_om_node = NULL;
+ axiom_element_t * topic_om_ele = NULL;
+ axiom_namespace_t *ns1 = NULL;
+ axis2_char_t *om_str = NULL;
+
+ ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX);
+ om_ele = axiom_element_create(env, NULL, ELEM_NAME_SUBSCRIBERS, ns1, &om_node);
+ topic_om_ele = axiom_element_create(env, om_node, ELEM_NAME_TOPIC, ns1,
+ &topic_om_node);
+ axiom_element_set_text(topic_om_ele, env, topic, topic_om_node);
- return store;
+ om_str = axiom_node_to_string(om_node, env);
+ if (om_str)
+ {
+ printf("\nSending OM : %s\n", om_str);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Sending OM : %s", om_str);
+ AXIS2_FREE(env->allocator, om_str);
+ om_str = NULL;
+ }
+ return om_node;
+}
+
+static axutil_hash_t *
+process_subscriber_list_node(
+ const axutil_env_t *env,
+ axiom_node_t *subs_list_node)
+{
+ axiom_element_t *subs_list_element = NULL;
+ axiom_children_qname_iterator_t *subs_iter = NULL;
+ axutil_hash_t *subscriber_list = axutil_hash_make(env);
+ axutil_qname_t *qname = NULL;
+
+ subs_list_element = axiom_node_get_data_element(subs_list_node, env);
+
+ /* Get Subscriber elements from subscriber list */
+ qname = axutil_qname_create(env, ELEM_NAME_SUBSCRIBE, EVENTING_NAMESPACE, NULL);
+ subs_iter = axiom_element_get_children_with_qname(subs_list_element, env,
+ qname, subs_list_node);
+ if(!subs_iter)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[ML] Subscribers list is empty");
+ return NULL;
+ }
+ while(axiom_children_qname_iterator_has_next(subs_iter, env))
+ {
+ savan_subscriber_t *subscriber = NULL;
+ axiom_node_t *sub_node = NULL;
+ axiom_node_t *endto_node = NULL;
+ axiom_node_t *delivery_node = NULL;
+ axiom_node_t *notify_node = NULL;
+ axiom_node_t *filter_node = NULL;
+ axiom_node_t *expires_node = NULL;
+
+ axiom_element_t *sub_elem = NULL;
+ axiom_element_t *endto_elem = NULL;
+ axiom_element_t *delivery_elem = NULL;
+ axiom_element_t *notify_elem = NULL;
+ axiom_element_t *expires_elem = NULL;
+ axiom_element_t *filter_elem = NULL;
+
+ axis2_char_t *endto = NULL;
+ axis2_char_t *notify = NULL;
+ axis2_char_t *expires = NULL;
+ axis2_char_t *filter = NULL;
+
+ axis2_endpoint_ref_t *endto_epr = NULL;
+ axis2_endpoint_ref_t *notify_epr = NULL;
+
+ sub_node = axiom_children_qname_iterator_next(subs_iter, env);
+ if(sub_node)
+ {
+ subscriber = savan_subscriber_create(env);
+ if (!subscriber)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[ML] Failed to create a"
+ "subscriber instance");
+ return NULL;
+ }
+ /* Now read each sub element of Subscribe element */
+
+ /* EndTo */
+ qname = axutil_qname_create(env, ELEM_NAME_ENDTO, EVENTING_NAMESPACE, NULL);
+ endto_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+ sub_node, &endto_node);
+ axutil_qname_free(qname, env);
+
+ endto = axiom_element_get_text(endto_elem, env, endto_node);
+
+ endto_epr = axis2_endpoint_ref_create(env, endto);
+
+ savan_subscriber_set_end_to(subscriber, env, endto_epr);
+
+ /* Get Delivery element and read NotifyTo */
+ qname = axutil_qname_create(env, ELEM_NAME_DELIVERY, EVENTING_NAMESPACE, NULL);
+ delivery_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+ sub_node, &delivery_node);
+ axutil_qname_free(qname, env);
+
+ qname = axutil_qname_create(env, ELEM_NAME_NOTIFYTO, EVENTING_NAMESPACE, NULL);
+ notify_elem = axiom_element_get_first_child_with_qname(delivery_elem, env, qname,
+ delivery_node, ¬ify_node);
+ axutil_qname_free(qname, env);
+ notify = axiom_element_get_text(notify_elem, env, notify_node);
+
+ notify_epr = axis2_endpoint_ref_create(env, notify);
+
+ savan_subscriber_set_notify_to(subscriber, env, notify_epr);
+
+ /* Expires */
+ qname = axutil_qname_create(env, ELEM_NAME_EXPIRES, EVENTING_NAMESPACE, NULL);
+ expires_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+ sub_node, &expires_node);
+ axutil_qname_free(qname, env);
+
+ expires = axiom_element_get_text(expires_elem, env, expires_node);
+
+ savan_subscriber_set_expires(subscriber, env, expires);
+
+ /* Filter */
+ qname = axutil_qname_create(env, ELEM_NAME_FILTER, EVENTING_NAMESPACE, NULL);
+ filter_elem = axiom_element_get_first_child_with_qname(sub_elem, env, qname,
+ sub_node, &filter_node);
+ axutil_qname_free(qname, env);
+
+ filter = axiom_element_get_text(filter_elem, env, filter_node);
+
+ savan_subscriber_set_filter(subscriber, env, filter);
+ axutil_hash_set(subscriber_list, savan_subscriber_get_id(subscriber,
+ env), AXIS2_HASH_KEY_STRING, subscriber);
+ }
+ }
+ axutil_qname_free(qname, env);
+ return subscriber_list;
}
+
+
+
+static axiom_node_t *
+build_add_subscriber_om_payload(
+ const axutil_env_t *env,
+ savan_subscriber_t *subscriber)
+{
+ axiom_node_t *add_node = NULL;
+ axiom_element_t* add_ele = NULL;
+ axiom_namespace_t *ns = NULL;
+ axiom_namespace_t *ns1 = NULL;
+ axiom_node_t *sub_node = NULL;
+ axiom_node_t *id_node = NULL;
+ axiom_node_t *topic_node = NULL;
+ axiom_node_t *endto_node = NULL;
+ axiom_node_t *delivery_node = NULL;
+ axiom_node_t *notify_node = NULL;
+ axiom_node_t *filter_node = NULL;
+ axiom_node_t *expires_node = NULL;
+ axiom_element_t* sub_elem = NULL;
+ axiom_element_t* id_elem = NULL;
+ axiom_element_t* topic_elem = NULL;
+ axiom_element_t* endto_elem = NULL;
+ axiom_element_t* delivery_elem = NULL;
+ axiom_element_t* notify_elem = NULL;
+ axiom_element_t* filter_elem = NULL;
+ axiom_element_t* expires_elem = NULL;
+ const axis2_char_t *endto = NULL;
+ const axis2_char_t *notify = NULL;
+ axis2_char_t *filter = NULL;
+ const axis2_char_t *expires = NULL;
+ axis2_char_t *topic = NULL;
+ axis2_char_t *id = NULL;
+
+ axis2_endpoint_ref_t *endto_ref = savan_subscriber_get_end_to(subscriber, env);
+ endto = axis2_endpoint_ref_get_address(endto_ref, env);
+ axis2_endpoint_ref_t *notify_ref = savan_subscriber_get_notify_to(subscriber, env);
+ notify = axis2_endpoint_ref_get_address(notify_ref, env);
+ filter = savan_subscriber_get_filter(subscriber, env);
+ expires = savan_subscriber_get_expires(subscriber, env);
+ id = savan_subscriber_get_id(subscriber, env);
+
+ ns = axiom_namespace_create (env, EVENTING_NAMESPACE, EVENTING_NS_PREFIX);
+ ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX);
+ add_ele = axiom_element_create(env, NULL, ELEM_NAME_ADD_SUBSCRIBER, ns1, &add_node);
+
+ /* create the id element */
+ if(id)
+ {
+ id_elem = axiom_element_create(env, add_node, ELEM_NAME_ID, ns1, &id_node);
+ axiom_element_set_text(id_elem, env, id, id_node);
+ }
+ /* create the topic element */
+ topic_elem = axiom_element_create(env, add_node, ELEM_NAME_TOPIC, ns1, &topic_node);
+ topic = savan_subscriber_get_topic(subscriber, env);
+ if(topic)
+ axiom_element_set_text(topic_elem, env, topic, topic_node);
+ /* create the subscriber element */
+ sub_elem = axiom_element_create(env, add_node, ELEM_NAME_SUBSCRIBE, ns, &sub_node);
+
+ /* EndTo element */
+ endto_elem = axiom_element_create(env, sub_node, ELEM_NAME_ENDTO, ns,
+ &endto_node);
+ axiom_element_set_text(endto_elem, env, endto, endto_node);
+
+ /* Delivery element */
+ delivery_elem = axiom_element_create(env, sub_node, ELEM_NAME_DELIVERY, ns,
+ &delivery_node);
+
+ notify_elem = axiom_element_create(env, delivery_node, ELEM_NAME_NOTIFYTO, ns,
+ ¬ify_node);
+ axiom_element_set_text(notify_elem, env, notify, notify_node);
+
+ /* Expires element */
+ expires_elem = axiom_element_create(env, sub_node, ELEM_NAME_EXPIRES, ns,
+ &expires_node);
+ axiom_element_set_text(expires_elem, env, expires, expires_node);
+ /* Filter element */
+ filter_elem = axiom_element_create(env, sub_node, ELEM_NAME_FILTER, ns,
+ &endto_node);
+ axiom_element_set_text(filter_elem, env, filter, filter_node);
+
+ return add_node;
+}
+
/******************************************************************************/