You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/08/15 06:29:25 UTC
svn commit: r686142 -
/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
Author: damitha
Date: Thu Aug 14 21:29:24 2008
New Revision: 686142
URL: http://svn.apache.org/viewvc?rev=686142&view=rev
Log:
Some refactoring of app_msg_processor.c
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=686142&r1=686141&r2=686142&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Thu Aug 14 21:29:24 2008
@@ -2281,6 +2281,182 @@
return status;
}
+static axis2_status_t
+sandesha2_app_msg_processor_start_create_seq_msg_resender(
+ const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *internal_sequence_id,
+ axis2_char_t *msg_id,
+ const axis2_bool_t is_server_side,
+ int retrans_interval,
+ sandesha2_sender_bean_t *create_sequence_sender_bean,
+ axis2_msg_ctx_t *create_seq_msg_ctx)
+{
+ axutil_thread_t *worker_thread = NULL;
+ sandesha2_app_msg_processor_args_t *args = NULL;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Entry:sandesha2_app_msg_processor_start_create_seq_msg_resender");
+
+ args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
+ args->env = axutil_init_thread_env(env);
+ args->conf_ctx = conf_ctx;
+ args->internal_sequence_id = internal_sequence_id;
+ args->msg_id = msg_id;
+ args->retrans_interval = retrans_interval;
+ args->is_server_side = is_server_side;
+ args->bean = create_sequence_sender_bean;
+ args->msg_ctx = create_seq_msg_ctx;
+
+ worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
+ sandesha2_app_msg_processor_create_seq_msg_worker_function, (void*)args);
+ if(!worker_thread)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_create_seq_msg_resender");
+ return AXIS2_FAILURE;
+ }
+
+ axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
+ return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_create_seq_msg_worker_function(
+ axutil_thread_t *thd,
+ void *data)
+{
+ sandesha2_app_msg_processor_args_t *args;
+ axutil_env_t *env = NULL;
+ sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+ sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
+ sandesha2_sender_mgr_t *sender_mgr = NULL;
+ int retrans_interval = 0;
+ axis2_char_t *dbname = NULL;
+ axis2_conf_ctx_t *conf_ctx = NULL;
+ axis2_char_t *internal_sequence_id = NULL;
+ axis2_bool_t is_server_side = AXIS2_FALSE;
+ sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
+ axis2_char_t *msg_id = NULL;
+ sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ axis2_bool_t continue_sending = AXIS2_TRUE;
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
+ axis2_op_t *create_seq_op = NULL;
+
+ args = (sandesha2_app_msg_processor_args_t*) data;
+ env = args->env;
+ axutil_allocator_switch_to_global_pool(env->allocator);
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Entry:sandesha2_app_msg_processor_create_seq_msg_worker_function");
+
+ conf_ctx = args->conf_ctx;
+ msg_id = args->msg_id;
+ internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+ is_server_side = args->is_server_side;
+ retrans_interval = args->retrans_interval;
+ create_sequence_sender_bean = (sandesha2_sender_bean_t *) args->bean;
+ axis2_msg_ctx_t *create_seq_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
+
+ dbname = sandesha2_util_get_dbname(env, conf_ctx);
+ storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+ seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+ create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
+ sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
+ SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+ create_seq_op = axis2_msg_ctx_get_op(create_seq_msg_ctx, env);
+ transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
+ transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+
+ AXIS2_SLEEP(retrans_interval);
+
+ while(!rms_sequence_bean)
+ {
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, create_sequence_sender_bean,
+ conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
+ sandesha2_sender_mgr_update(sender_mgr, env, create_sequence_sender_bean);
+
+ if(!continue_sending)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Do not continue sending the create sequence message");
+ break;
+ }
+
+ AXIS2_SLEEP(retrans_interval);
+
+ if(transport_sender)
+ {
+ /* This is neccessary to avoid a double free */
+ /*axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
+ if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
+ }
+ }
+
+ /*if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
+ {
+ status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx,
+ storage_mgr);
+
+ if(AXIS2_SUCCESS != status)
+ {
+ break;
+ }
+ }*/
+
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
+ SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+ }
+
+ if(rms_sequence_bean)
+ {
+ sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+ }
+
+ if(create_seq_msg_ctx)
+ {
+ axis2_msg_ctx_free(create_seq_msg_ctx, env);
+ }
+
+ if(storage_mgr)
+ {
+ sandesha2_storage_mgr_free(storage_mgr, env);
+ }
+
+ if(create_seq_mgr)
+ {
+ sandesha2_create_seq_mgr_free(create_seq_mgr, env);
+ }
+
+ if(sender_mgr)
+ {
+ sandesha2_sender_mgr_free(sender_mgr, env);
+ }
+
+ if(seq_prop_mgr)
+ {
+ sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
+ }
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Exit:sandesha2_app_msg_processor_create_seq_msg_worker_function");
+
+ axutil_allocator_switch_to_local_pool(env->allocator);
+
+ return NULL;
+}
+
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_create_seq_response(
const axutil_env_t *env,
@@ -2705,6 +2881,7 @@
if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, from_acks_to_addr)
&& !to_addr)
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came1");
sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
if(req_rm_msg_ctx)
{
@@ -2846,6 +3023,7 @@
if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came2");
engine = axis2_engine_create(env, conf_ctx);
if(axis2_engine_resume_send(engine, env, app_msg_ctx))
{
@@ -2971,6 +3149,7 @@
}
else
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came3");
axis2_msg_ctx_increment_ref(app_msg_ctx, env);
engine = axis2_engine_create(env, conf_ctx);
if(axis2_engine_resume_send(engine, env, app_msg_ctx))
@@ -3026,76 +3205,225 @@
return status;
}
-/* This function will be called in the duplex mode only from within the application message sender thread. */
static axis2_status_t
-sandesha2_app_msg_processor_resend(
+sandesha2_app_msg_processor_start_application_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
- axis2_bool_t is_svr_side,
- const axis2_char_t *internal_sequence_id,
- sandesha2_storage_mgr_t *storage_mgr,
- sandesha2_seq_property_mgr_t *seq_prop_mgr,
- sandesha2_create_seq_mgr_t *create_seq_mgr,
- sandesha2_sender_mgr_t *sender_mgr,
+ const axis2_bool_t is_server_side,
+ int retrans_interval,
axis2_msg_ctx_t *app_msg_ctx)
{
- sandesha2_sender_bean_t *sender_worker_bean = NULL;
- sandesha2_sender_bean_t *bean1 = NULL;
- /*axis2_char_t *key = NULL;*/
- axis2_bool_t continue_sending = AXIS2_TRUE;
- /*sandesha2_msg_ctx_t *rm_msg_ctx = NULL;*/
- axis2_transport_out_desc_t *transport_out = NULL;
- axis2_transport_sender_t *transport_sender = NULL;
- axis2_bool_t successfully_sent = AXIS2_FALSE;
- axis2_status_t status = AXIS2_SUCCESS;
- axis2_bool_t resend = AXIS2_FALSE;
+ axutil_thread_t *worker_thread = NULL;
+ sandesha2_app_msg_processor_args_t *args = NULL;
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_app_msg_processor_resend");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Entry:sandesha2_app_msg_processor_start_application_msg_resender");
+
+ args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
+ args->env = axutil_init_thread_env(env);
+ args->conf_ctx = conf_ctx;
+ args->internal_sequence_id = internal_sequence_id;
+ args->msg_id = msg_id;
+ args->retrans_interval = retrans_interval;
+ args->is_server_side = is_server_side;
+ args->msg_ctx = app_msg_ctx;
- sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
- if(!sender_worker_bean)
+ worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
+ sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
+ if(!worker_thread)
{
- AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_application_msg_resender");
return AXIS2_FAILURE;
}
- continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean,
- conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
- sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
- if(!continue_sending)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Do not continue sending the application message");
- if(sender_worker_bean)
- {
- sandesha2_sender_bean_free(sender_worker_bean, env);
- }
+ axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
- return AXIS2_FAILURE;
- }
-
- /*rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
-
- if(!sandesha2_util_is_ack_already_piggybacked(env, app_rm_msg_ctx))
- {
- sandesha2_ack_mgr_piggyback_acks_if_present(env, app_rm_msg_ctx, storage_mgr, seq_prop_mgr,
- sender_mgr);
- }*/
-
- transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
- if(transport_out)
- {
- transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
- }
- if(transport_sender)
- {
- /* This is neccessary to avoid a double free */
- /*axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
- if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
- {
- successfully_sent = AXIS2_TRUE;
- }else
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Exit:sandesha2_app_msg_processor_start_application_msg_resender");
+ return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+ axutil_thread_t *thd,
+ void *data)
+{
+ sandesha2_app_msg_processor_args_t *args;
+ axutil_env_t *env = NULL;
+ sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+ sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
+ sandesha2_sender_mgr_t *sender_mgr = NULL;
+ int retrans_interval = 0;
+ axis2_char_t *dbname = NULL;
+ axis2_conf_ctx_t *conf_ctx = NULL;
+ axis2_char_t *internal_sequence_id = NULL;
+ axis2_bool_t is_server_side = AXIS2_FALSE;
+ sandesha2_sender_bean_t *sender_bean = NULL;
+ axis2_char_t *msg_id = NULL;
+ axis2_status_t status = AXIS2_FAILURE;
+ axis2_msg_ctx_t *app_msg_ctx = NULL;
+
+ args = (sandesha2_app_msg_processor_args_t*) data;
+ env = args->env;
+ axutil_allocator_switch_to_global_pool(env->allocator);
+ app_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+ conf_ctx = args->conf_ctx;
+ msg_id = args->msg_id;
+ internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+ is_server_side = args->is_server_side;
+ retrans_interval = args->retrans_interval;
+ dbname = sandesha2_util_get_dbname(env, conf_ctx);
+ storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+ seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+ create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
+ sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+ while(AXIS2_TRUE)
+ {
+ AXIS2_SLEEP(retrans_interval);
+ sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
+ internal_sequence_id, msg_id);
+ if(!sender_bean)
+ {
+ /* There is no pending message to send. So exit from the thread. */
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] There is no pending message to send. So exit from the thread");
+ break;
+ }
+
+ status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
+ internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr,
+ sender_mgr, app_msg_ctx);
+
+ if(AXIS2_SUCCESS != status)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
+ internal_sequence_id);
+
+ if(sender_bean)
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
+ break;
+ }
+
+ if(sender_bean)
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
+ }
+
+ if(app_msg_ctx)
+ {
+ axis2_msg_ctx_free(app_msg_ctx, env);
+ }
+
+ if(storage_mgr)
+ {
+ sandesha2_storage_mgr_free(storage_mgr, env);
+ }
+
+ if(create_seq_mgr)
+ {
+ sandesha2_create_seq_mgr_free(create_seq_mgr, env);
+ }
+
+ if(sender_mgr)
+ {
+ sandesha2_sender_mgr_free(sender_mgr, env);
+ }
+
+ if(seq_prop_mgr)
+ {
+ sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
+ }
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Exit:sandesha2_app_msg_processor_application_msg_worker_function");
+
+ axutil_allocator_switch_to_local_pool(env->allocator);
+
+ return NULL;
+}
+
+/* This function will be called in the duplex mode only from within the application message sender thread. */
+static axis2_status_t
+sandesha2_app_msg_processor_resend(
+ const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *msg_id,
+ axis2_bool_t is_svr_side,
+ const axis2_char_t *internal_sequence_id,
+ sandesha2_storage_mgr_t *storage_mgr,
+ sandesha2_seq_property_mgr_t *seq_prop_mgr,
+ sandesha2_create_seq_mgr_t *create_seq_mgr,
+ sandesha2_sender_mgr_t *sender_mgr,
+ axis2_msg_ctx_t *app_msg_ctx)
+{
+ sandesha2_sender_bean_t *sender_worker_bean = NULL;
+ sandesha2_sender_bean_t *bean1 = NULL;
+ /*axis2_char_t *key = NULL;*/
+ axis2_bool_t continue_sending = AXIS2_TRUE;
+ /*sandesha2_msg_ctx_t *rm_msg_ctx = NULL;*/
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
+ axis2_bool_t successfully_sent = AXIS2_FALSE;
+ axis2_status_t status = AXIS2_SUCCESS;
+ axis2_bool_t resend = AXIS2_FALSE;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_app_msg_processor_resend");
+
+ sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+ if(!sender_worker_bean)
+ {
+ AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
+ return AXIS2_FAILURE;
+ }
+
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean,
+ conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+ sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
+ if(!continue_sending)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Do not continue sending the application message");
+ if(sender_worker_bean)
+ {
+ sandesha2_sender_bean_free(sender_worker_bean, env);
+ }
+
+ return AXIS2_FAILURE;
+ }
+
+ /*rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+
+ if(!sandesha2_util_is_ack_already_piggybacked(env, app_rm_msg_ctx))
+ {
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, app_rm_msg_ctx, storage_mgr, seq_prop_mgr,
+ sender_mgr);
+ }*/
+
+ transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
+ if(transport_out)
+ {
+ transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+ }
+ if(transport_sender)
+ {
+ /* This is neccessary to avoid a double free */
+ /*axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
+ if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
+ {
+ successfully_sent = AXIS2_TRUE;
+ }else
{
successfully_sent = AXIS2_FALSE;
}
@@ -3343,328 +3671,4 @@
return AXIS2_SUCCESS;
}
-static axis2_status_t
-sandesha2_app_msg_processor_start_create_seq_msg_resender(
- const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *internal_sequence_id,
- axis2_char_t *msg_id,
- const axis2_bool_t is_server_side,
- int retrans_interval,
- sandesha2_sender_bean_t *create_sequence_sender_bean,
- axis2_msg_ctx_t *create_seq_msg_ctx)
-{
- axutil_thread_t *worker_thread = NULL;
- sandesha2_app_msg_processor_args_t *args = NULL;
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_app_msg_processor_start_create_seq_msg_resender");
-
- args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
- args->env = axutil_init_thread_env(env);
- args->conf_ctx = conf_ctx;
- args->internal_sequence_id = internal_sequence_id;
- args->msg_id = msg_id;
- args->retrans_interval = retrans_interval;
- args->is_server_side = is_server_side;
- args->bean = create_sequence_sender_bean;
- args->msg_ctx = create_seq_msg_ctx;
-
- worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
- sandesha2_app_msg_processor_create_seq_msg_worker_function, (void*)args);
- if(!worker_thread)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
- "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_create_seq_msg_resender");
- return AXIS2_FAILURE;
- }
-
- axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
- return AXIS2_SUCCESS;
-}
-
-static void * AXIS2_THREAD_FUNC
-sandesha2_app_msg_processor_create_seq_msg_worker_function(
- axutil_thread_t *thd,
- void *data)
-{
- sandesha2_app_msg_processor_args_t *args;
- axutil_env_t *env = NULL;
- sandesha2_storage_mgr_t *storage_mgr = NULL;
- sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
- sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
- sandesha2_sender_mgr_t *sender_mgr = NULL;
- int retrans_interval = 0;
- axis2_char_t *dbname = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_char_t *internal_sequence_id = NULL;
- axis2_bool_t is_server_side = AXIS2_FALSE;
- sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
- axis2_char_t *msg_id = NULL;
- sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
- axis2_bool_t continue_sending = AXIS2_TRUE;
- axis2_transport_out_desc_t *transport_out = NULL;
- axis2_transport_sender_t *transport_sender = NULL;
- axis2_op_t *create_seq_op = NULL;
-
- args = (sandesha2_app_msg_processor_args_t*) data;
- env = args->env;
- axutil_allocator_switch_to_global_pool(env->allocator);
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_app_msg_processor_create_seq_msg_worker_function");
-
- conf_ctx = args->conf_ctx;
- msg_id = args->msg_id;
- internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
- is_server_side = args->is_server_side;
- retrans_interval = args->retrans_interval;
- create_sequence_sender_bean = (sandesha2_sender_bean_t *) args->bean;
- axis2_msg_ctx_t *create_seq_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
-
- dbname = sandesha2_util_get_dbname(env, conf_ctx);
- storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
- seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
- create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
- sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
-
- rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
- SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
-
- create_seq_op = axis2_msg_ctx_get_op(create_seq_msg_ctx, env);
- transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
- transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
-
- AXIS2_SLEEP(retrans_interval);
-
- while(!rms_sequence_bean)
- {
- continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, create_sequence_sender_bean,
- conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
-
- sandesha2_sender_mgr_update(sender_mgr, env, create_sequence_sender_bean);
-
- if(!continue_sending)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Do not continue sending the create sequence message");
- break;
- }
-
- AXIS2_SLEEP(retrans_interval);
-
- if(transport_sender)
- {
- /* This is neccessary to avoid a double free */
- /*axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
- if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
- }
- }
-
- /*if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
- {
- status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx,
- storage_mgr);
-
- if(AXIS2_SUCCESS != status)
- {
- break;
- }
- }*/
-
- rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
- SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
- }
-
- if(rms_sequence_bean)
- {
- sandesha2_seq_property_bean_free(rms_sequence_bean, env);
- }
-
- if(create_seq_msg_ctx)
- {
- axis2_msg_ctx_free(create_seq_msg_ctx, env);
- }
-
- if(storage_mgr)
- {
- sandesha2_storage_mgr_free(storage_mgr, env);
- }
-
- if(create_seq_mgr)
- {
- sandesha2_create_seq_mgr_free(create_seq_mgr, env);
- }
-
- if(sender_mgr)
- {
- sandesha2_sender_mgr_free(sender_mgr, env);
- }
-
- if(seq_prop_mgr)
- {
- sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
- }
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_app_msg_processor_create_seq_msg_worker_function");
-
- axutil_allocator_switch_to_local_pool(env->allocator);
-
- return NULL;
-}
-
-static axis2_status_t
-sandesha2_app_msg_processor_start_application_msg_resender(
- const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *internal_sequence_id,
- axis2_char_t *msg_id,
- const axis2_bool_t is_server_side,
- int retrans_interval,
- axis2_msg_ctx_t *app_msg_ctx)
-{
- axutil_thread_t *worker_thread = NULL;
- sandesha2_app_msg_processor_args_t *args = NULL;
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_app_msg_processor_start_application_msg_resender");
-
- args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
- args->env = axutil_init_thread_env(env);
- args->conf_ctx = conf_ctx;
- args->internal_sequence_id = internal_sequence_id;
- args->msg_id = msg_id;
- args->retrans_interval = retrans_interval;
- args->is_server_side = is_server_side;
- args->msg_ctx = app_msg_ctx;
-
- worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
- sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
- if(!worker_thread)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
- "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_application_msg_resender");
- return AXIS2_FAILURE;
- }
-
- axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_app_msg_processor_start_application_msg_resender");
- return AXIS2_SUCCESS;
-}
-
-static void * AXIS2_THREAD_FUNC
-sandesha2_app_msg_processor_application_msg_worker_function(
- axutil_thread_t *thd,
- void *data)
-{
- sandesha2_app_msg_processor_args_t *args;
- axutil_env_t *env = NULL;
- sandesha2_storage_mgr_t *storage_mgr = NULL;
- sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
- sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
- sandesha2_sender_mgr_t *sender_mgr = NULL;
- int retrans_interval = 0;
- axis2_char_t *dbname = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_char_t *internal_sequence_id = NULL;
- axis2_bool_t is_server_side = AXIS2_FALSE;
- sandesha2_sender_bean_t *sender_bean = NULL;
- axis2_char_t *msg_id = NULL;
- axis2_status_t status = AXIS2_FAILURE;
- axis2_msg_ctx_t *app_msg_ctx = NULL;
-
- args = (sandesha2_app_msg_processor_args_t*) data;
- env = args->env;
- axutil_allocator_switch_to_global_pool(env->allocator);
- app_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
- conf_ctx = args->conf_ctx;
- msg_id = args->msg_id;
- internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
- is_server_side = args->is_server_side;
- retrans_interval = args->retrans_interval;
- dbname = sandesha2_util_get_dbname(env, conf_ctx);
- storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
- seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
- create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
- sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
-
- while(AXIS2_TRUE)
- {
- AXIS2_SLEEP(retrans_interval);
- sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
- internal_sequence_id, msg_id);
- if(!sender_bean)
- {
- /* There is no pending message to send. So exit from the thread. */
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] There is no pending message to send. So exit from the thread");
- break;
- }
-
- status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
- internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr,
- sender_mgr, app_msg_ctx);
-
- if(AXIS2_SUCCESS != status)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
- internal_sequence_id);
-
- if(sender_bean)
- {
- sandesha2_sender_bean_free(sender_bean, env);
- }
- break;
- }
-
- if(sender_bean)
- {
- sandesha2_sender_bean_free(sender_bean, env);
- }
- }
-
- if(app_msg_ctx)
- {
- axis2_msg_ctx_free(app_msg_ctx, env);
- }
-
- if(storage_mgr)
- {
- sandesha2_storage_mgr_free(storage_mgr, env);
- }
-
- if(create_seq_mgr)
- {
- sandesha2_create_seq_mgr_free(create_seq_mgr, env);
- }
-
- if(sender_mgr)
- {
- sandesha2_sender_mgr_free(sender_mgr, env);
- }
-
- if(seq_prop_mgr)
- {
- sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
- }
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_app_msg_processor_application_msg_worker_function");
-
- axutil_allocator_switch_to_local_pool(env->allocator);
-
- return NULL;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org