You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/08/15 06:29:25 UTC

svn commit: r686142 - /webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c

Author: damitha
Date: Thu Aug 14 21:29:24 2008
New Revision: 686142

URL: http://svn.apache.org/viewvc?rev=686142&view=rev
Log:
Some refactoring of app_msg_processor.c

Modified:
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=686142&r1=686141&r2=686142&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Thu Aug 14 21:29:24 2008
@@ -2281,6 +2281,182 @@
     return status;
 }
 
+static axis2_status_t
+sandesha2_app_msg_processor_start_create_seq_msg_resender(
+    const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *internal_sequence_id,
+    axis2_char_t *msg_id,
+    const axis2_bool_t is_server_side,
+    int retrans_interval,
+    sandesha2_sender_bean_t *create_sequence_sender_bean,
+    axis2_msg_ctx_t *create_seq_msg_ctx)
+{
+    axutil_thread_t *worker_thread = NULL;
+    sandesha2_app_msg_processor_args_t *args = NULL;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Entry:sandesha2_app_msg_processor_start_create_seq_msg_resender");
+    
+    args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
+    args->env = axutil_init_thread_env(env);
+    args->conf_ctx = conf_ctx;
+    args->internal_sequence_id = internal_sequence_id;
+    args->msg_id = msg_id;
+    args->retrans_interval = retrans_interval;
+    args->is_server_side = is_server_side;
+    args->bean = create_sequence_sender_bean;
+    args->msg_ctx = create_seq_msg_ctx;
+
+    worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
+            sandesha2_app_msg_processor_create_seq_msg_worker_function, (void*)args);
+    if(!worker_thread)
+    {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_create_seq_msg_resender");
+        return AXIS2_FAILURE;
+    }
+
+    axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+            "[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
+    return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_create_seq_msg_worker_function(
+    axutil_thread_t *thd, 
+    void *data)
+{
+    sandesha2_app_msg_processor_args_t *args;
+    axutil_env_t *env = NULL;
+    sandesha2_storage_mgr_t *storage_mgr = NULL;
+    sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+    sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
+    sandesha2_sender_mgr_t *sender_mgr = NULL;
+    int retrans_interval = 0;
+    axis2_char_t *dbname = NULL;
+    axis2_conf_ctx_t *conf_ctx = NULL;
+    axis2_char_t *internal_sequence_id = NULL;
+    axis2_bool_t is_server_side = AXIS2_FALSE;
+    sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
+    axis2_char_t *msg_id = NULL;
+    sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+    axis2_bool_t continue_sending = AXIS2_TRUE;
+    axis2_transport_out_desc_t *transport_out = NULL;
+    axis2_transport_sender_t *transport_sender = NULL;
+    axis2_op_t *create_seq_op = NULL;
+
+    args = (sandesha2_app_msg_processor_args_t*) data;
+    env = args->env;
+    axutil_allocator_switch_to_global_pool(env->allocator);
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] Entry:sandesha2_app_msg_processor_create_seq_msg_worker_function");
+
+    conf_ctx = args->conf_ctx;
+    msg_id = args->msg_id;
+    internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+    is_server_side = args->is_server_side;
+    retrans_interval = args->retrans_interval;
+    create_sequence_sender_bean = (sandesha2_sender_bean_t *) args->bean;
+    axis2_msg_ctx_t *create_seq_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
+
+    dbname = sandesha2_util_get_dbname(env, conf_ctx);
+    storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+    seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+    create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
+    sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+    rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
+            SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+    create_seq_op = axis2_msg_ctx_get_op(create_seq_msg_ctx, env);
+    transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
+    transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+
+    AXIS2_SLEEP(retrans_interval);
+
+    while(!rms_sequence_bean)
+    {
+        continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, create_sequence_sender_bean, 
+                conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
+        sandesha2_sender_mgr_update(sender_mgr, env, create_sequence_sender_bean);
+
+        if(!continue_sending)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                    "[sandesha2] Do not continue sending the create sequence message");
+            break;
+        }
+
+        AXIS2_SLEEP(retrans_interval);
+
+        if(transport_sender)
+        {
+            /* This is neccessary to avoid a double free */
+            /*axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
+            if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
+            {
+                AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
+            }
+        }
+
+        /*if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
+        {
+            status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx, 
+                storage_mgr);
+    
+            if(AXIS2_SUCCESS != status)
+            {
+                break;
+            }
+        }*/
+
+        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
+            SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+    }
+
+    if(rms_sequence_bean)
+    {
+        sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+    }
+
+    if(create_seq_msg_ctx)
+    {
+        axis2_msg_ctx_free(create_seq_msg_ctx, env);
+    }
+
+    if(storage_mgr)
+    {
+        sandesha2_storage_mgr_free(storage_mgr, env);
+    }
+    
+    if(create_seq_mgr)
+    {
+        sandesha2_create_seq_mgr_free(create_seq_mgr, env);
+    }
+    
+    if(sender_mgr)
+    {
+        sandesha2_sender_mgr_free(sender_mgr, env);
+    }
+    
+    if(seq_prop_mgr)
+    {
+        sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
+    }
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] Exit:sandesha2_app_msg_processor_create_seq_msg_worker_function");
+    
+    axutil_allocator_switch_to_local_pool(env->allocator);
+
+    return NULL;
+}
+
 static axis2_status_t AXIS2_CALL
 sandesha2_app_msg_processor_process_create_seq_response(
     const axutil_env_t *env, 
@@ -2705,6 +2881,7 @@
     if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, from_acks_to_addr) 
             && !to_addr)
     {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came1");
         sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
         if(req_rm_msg_ctx)
         {
@@ -2846,6 +3023,7 @@
 
     if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
     {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came2");
         engine = axis2_engine_create(env, conf_ctx);
         if(axis2_engine_resume_send(engine, env, app_msg_ctx))
         {
@@ -2971,6 +3149,7 @@
     }
     else
     {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came3");
         axis2_msg_ctx_increment_ref(app_msg_ctx, env);
         engine = axis2_engine_create(env, conf_ctx);
         if(axis2_engine_resume_send(engine, env, app_msg_ctx))
@@ -3026,76 +3205,225 @@
     return status;
 }
 
-/* This function will be called in the duplex mode only from within the application message sender thread. */
 static axis2_status_t
-sandesha2_app_msg_processor_resend(
+sandesha2_app_msg_processor_start_application_msg_resender(
     const axutil_env_t *env,
     axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *internal_sequence_id,
     axis2_char_t *msg_id,
-    axis2_bool_t is_svr_side,
-    const axis2_char_t *internal_sequence_id,
-    sandesha2_storage_mgr_t *storage_mgr,
-    sandesha2_seq_property_mgr_t *seq_prop_mgr,
-    sandesha2_create_seq_mgr_t *create_seq_mgr,
-    sandesha2_sender_mgr_t *sender_mgr,
+    const axis2_bool_t is_server_side,
+    int retrans_interval,
     axis2_msg_ctx_t *app_msg_ctx)
 {
-    sandesha2_sender_bean_t *sender_worker_bean = NULL;
-    sandesha2_sender_bean_t *bean1 = NULL;
-    /*axis2_char_t *key = NULL;*/
-    axis2_bool_t continue_sending = AXIS2_TRUE;
-    /*sandesha2_msg_ctx_t *rm_msg_ctx = NULL;*/
-    axis2_transport_out_desc_t *transport_out = NULL;
-    axis2_transport_sender_t *transport_sender = NULL;
-    axis2_bool_t successfully_sent = AXIS2_FALSE;
-    axis2_status_t status = AXIS2_SUCCESS;
-    axis2_bool_t resend = AXIS2_FALSE;
+    axutil_thread_t *worker_thread = NULL;
+    sandesha2_app_msg_processor_args_t *args = NULL;
 
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_app_msg_processor_resend");
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Entry:sandesha2_app_msg_processor_start_application_msg_resender");
+    
+    args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
+    args->env = axutil_init_thread_env(env);
+    args->conf_ctx = conf_ctx;
+    args->internal_sequence_id = internal_sequence_id;
+    args->msg_id = msg_id;
+    args->retrans_interval = retrans_interval;
+    args->is_server_side = is_server_side;
+    args->msg_ctx = app_msg_ctx;
 
-    sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
-    if(!sender_worker_bean)
+    worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
+            sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
+    if(!worker_thread)
     {
-        AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_application_msg_resender");
         return AXIS2_FAILURE;
     }
 
-    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean, 
-            conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
-    sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
-    if(!continue_sending)
-    {
-        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                "[sandesha2] Do not continue sending the application message");
-        if(sender_worker_bean)
-        {
-            sandesha2_sender_bean_free(sender_worker_bean, env);
-        }
+    axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
 
-        return AXIS2_FAILURE;
-    }
-    
-    /*rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
-    
-    if(!sandesha2_util_is_ack_already_piggybacked(env, app_rm_msg_ctx))
-    {
-        sandesha2_ack_mgr_piggyback_acks_if_present(env, app_rm_msg_ctx, storage_mgr, seq_prop_mgr, 
-                sender_mgr);
-    }*/
-    
-    transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
-    if(transport_out)
-    {
-        transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
-    }
-    if(transport_sender)
-    {
-        /* This is neccessary to avoid a double free */
-        /*axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
-        if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
-		{
-        	successfully_sent = AXIS2_TRUE;
-		}else
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+            "[sandesha2] Exit:sandesha2_app_msg_processor_start_application_msg_resender");
+    return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+    axutil_thread_t *thd, 
+    void *data)
+{
+    sandesha2_app_msg_processor_args_t *args;
+    axutil_env_t *env = NULL;
+    sandesha2_storage_mgr_t *storage_mgr = NULL;
+    sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+    sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
+    sandesha2_sender_mgr_t *sender_mgr = NULL;
+    int retrans_interval = 0;
+    axis2_char_t *dbname = NULL;
+    axis2_conf_ctx_t *conf_ctx = NULL;
+    axis2_char_t *internal_sequence_id = NULL;
+    axis2_bool_t is_server_side = AXIS2_FALSE;
+    sandesha2_sender_bean_t *sender_bean = NULL;
+    axis2_char_t *msg_id = NULL;
+    axis2_status_t status = AXIS2_FAILURE;
+    axis2_msg_ctx_t *app_msg_ctx = NULL;
+
+    args = (sandesha2_app_msg_processor_args_t*) data;
+    env = args->env;
+    axutil_allocator_switch_to_global_pool(env->allocator);
+    app_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+    conf_ctx = args->conf_ctx;
+    msg_id = args->msg_id;
+    internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+    is_server_side = args->is_server_side;
+    retrans_interval = args->retrans_interval;
+    dbname = sandesha2_util_get_dbname(env, conf_ctx);
+    storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+    seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+    create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
+    sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+    while(AXIS2_TRUE)
+    {
+        AXIS2_SLEEP(retrans_interval);
+        sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env, 
+                internal_sequence_id, msg_id);
+        if(!sender_bean)
+        {
+            /* There is no pending message to send. So exit from the thread. */
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                    "[sandesha2] There is no pending message to send. So exit from the thread");
+            break;
+        }
+
+        status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
+                internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr, 
+                sender_mgr, app_msg_ctx);
+
+        if(AXIS2_SUCCESS != status)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                "[sandesha2] Resend failed for  message id %s in sequence %s", msg_id, 
+                internal_sequence_id);
+
+            if(sender_bean)
+            {
+                sandesha2_sender_bean_free(sender_bean, env); 
+            }
+            break;
+        }
+
+        if(sender_bean)
+        {
+            sandesha2_sender_bean_free(sender_bean, env); 
+        }
+    }
+
+    if(app_msg_ctx)
+    {
+        axis2_msg_ctx_free(app_msg_ctx, env);
+    }
+
+    if(storage_mgr)
+    {
+        sandesha2_storage_mgr_free(storage_mgr, env);
+    }
+    
+    if(create_seq_mgr)
+    {
+        sandesha2_create_seq_mgr_free(create_seq_mgr, env);
+    }
+    
+    if(sender_mgr)
+    {
+        sandesha2_sender_mgr_free(sender_mgr, env);
+    }
+    
+    if(seq_prop_mgr)
+    {
+        sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
+    }
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] Exit:sandesha2_app_msg_processor_application_msg_worker_function");
+    
+    axutil_allocator_switch_to_local_pool(env->allocator);
+    
+    return NULL;
+}
+
+/* This function will be called in the duplex mode only from within the application message sender thread. */
+static axis2_status_t
+sandesha2_app_msg_processor_resend(
+    const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *msg_id,
+    axis2_bool_t is_svr_side,
+    const axis2_char_t *internal_sequence_id,
+    sandesha2_storage_mgr_t *storage_mgr,
+    sandesha2_seq_property_mgr_t *seq_prop_mgr,
+    sandesha2_create_seq_mgr_t *create_seq_mgr,
+    sandesha2_sender_mgr_t *sender_mgr,
+    axis2_msg_ctx_t *app_msg_ctx)
+{
+    sandesha2_sender_bean_t *sender_worker_bean = NULL;
+    sandesha2_sender_bean_t *bean1 = NULL;
+    /*axis2_char_t *key = NULL;*/
+    axis2_bool_t continue_sending = AXIS2_TRUE;
+    /*sandesha2_msg_ctx_t *rm_msg_ctx = NULL;*/
+    axis2_transport_out_desc_t *transport_out = NULL;
+    axis2_transport_sender_t *transport_sender = NULL;
+    axis2_bool_t successfully_sent = AXIS2_FALSE;
+    axis2_status_t status = AXIS2_SUCCESS;
+    axis2_bool_t resend = AXIS2_FALSE;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_app_msg_processor_resend");
+
+    sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+    if(!sender_worker_bean)
+    {
+        AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
+        return AXIS2_FAILURE;
+    }
+
+    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean, 
+            conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+    sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
+    if(!continue_sending)
+    {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                "[sandesha2] Do not continue sending the application message");
+        if(sender_worker_bean)
+        {
+            sandesha2_sender_bean_free(sender_worker_bean, env);
+        }
+
+        return AXIS2_FAILURE;
+    }
+    
+    /*rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+    
+    if(!sandesha2_util_is_ack_already_piggybacked(env, app_rm_msg_ctx))
+    {
+        sandesha2_ack_mgr_piggyback_acks_if_present(env, app_rm_msg_ctx, storage_mgr, seq_prop_mgr, 
+                sender_mgr);
+    }*/
+    
+    transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
+    if(transport_out)
+    {
+        transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+    }
+    if(transport_sender)
+    {
+        /* This is neccessary to avoid a double free */
+        /*axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
+        if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
+		{
+        	successfully_sent = AXIS2_TRUE;
+		}else
 		{
         	successfully_sent = AXIS2_FALSE;
 		}
@@ -3343,328 +3671,4 @@
     return AXIS2_SUCCESS;
 }
 
-static axis2_status_t
-sandesha2_app_msg_processor_start_create_seq_msg_resender(
-    const axutil_env_t *env,
-    axis2_conf_ctx_t *conf_ctx,
-    axis2_char_t *internal_sequence_id,
-    axis2_char_t *msg_id,
-    const axis2_bool_t is_server_side,
-    int retrans_interval,
-    sandesha2_sender_bean_t *create_sequence_sender_bean,
-    axis2_msg_ctx_t *create_seq_msg_ctx)
-{
-    axutil_thread_t *worker_thread = NULL;
-    sandesha2_app_msg_processor_args_t *args = NULL;
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Entry:sandesha2_app_msg_processor_start_create_seq_msg_resender");
-    
-    args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
-    args->env = axutil_init_thread_env(env);
-    args->conf_ctx = conf_ctx;
-    args->internal_sequence_id = internal_sequence_id;
-    args->msg_id = msg_id;
-    args->retrans_interval = retrans_interval;
-    args->is_server_side = is_server_side;
-    args->bean = create_sequence_sender_bean;
-    args->msg_ctx = create_seq_msg_ctx;
-
-    worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
-            sandesha2_app_msg_processor_create_seq_msg_worker_function, (void*)args);
-    if(!worker_thread)
-    {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_create_seq_msg_resender");
-        return AXIS2_FAILURE;
-    }
-
-    axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
-            "[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
-    return AXIS2_SUCCESS;
-}
-
-static void * AXIS2_THREAD_FUNC
-sandesha2_app_msg_processor_create_seq_msg_worker_function(
-    axutil_thread_t *thd, 
-    void *data)
-{
-    sandesha2_app_msg_processor_args_t *args;
-    axutil_env_t *env = NULL;
-    sandesha2_storage_mgr_t *storage_mgr = NULL;
-    sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
-    sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
-    sandesha2_sender_mgr_t *sender_mgr = NULL;
-    int retrans_interval = 0;
-    axis2_char_t *dbname = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
-    axis2_char_t *internal_sequence_id = NULL;
-    axis2_bool_t is_server_side = AXIS2_FALSE;
-    sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
-    axis2_char_t *msg_id = NULL;
-    sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
-    axis2_bool_t continue_sending = AXIS2_TRUE;
-    axis2_transport_out_desc_t *transport_out = NULL;
-    axis2_transport_sender_t *transport_sender = NULL;
-    axis2_op_t *create_seq_op = NULL;
-
-    args = (sandesha2_app_msg_processor_args_t*) data;
-    env = args->env;
-    axutil_allocator_switch_to_global_pool(env->allocator);
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-        "[sandesha2] Entry:sandesha2_app_msg_processor_create_seq_msg_worker_function");
-
-    conf_ctx = args->conf_ctx;
-    msg_id = args->msg_id;
-    internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
-    is_server_side = args->is_server_side;
-    retrans_interval = args->retrans_interval;
-    create_sequence_sender_bean = (sandesha2_sender_bean_t *) args->bean;
-    axis2_msg_ctx_t *create_seq_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
-
-    dbname = sandesha2_util_get_dbname(env, conf_ctx);
-    storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
-    seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
-    create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
-    sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
-
-    rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
-            SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
-
-    create_seq_op = axis2_msg_ctx_get_op(create_seq_msg_ctx, env);
-    transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
-    transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
-
-    AXIS2_SLEEP(retrans_interval);
-
-    while(!rms_sequence_bean)
-    {
-        continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, create_sequence_sender_bean, 
-                conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
-
-        sandesha2_sender_mgr_update(sender_mgr, env, create_sequence_sender_bean);
-
-        if(!continue_sending)
-        {
-            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                    "[sandesha2] Do not continue sending the create sequence message");
-            break;
-        }
-
-        AXIS2_SLEEP(retrans_interval);
-
-        if(transport_sender)
-        {
-            /* This is neccessary to avoid a double free */
-            /*axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);*/
-            if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
-            {
-                AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
-            }
-        }
-
-        /*if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
-        {
-            status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx, 
-                storage_mgr);
-    
-            if(AXIS2_SUCCESS != status)
-            {
-                break;
-            }
-        }*/
-
-        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
-            SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
-    }
-
-    if(rms_sequence_bean)
-    {
-        sandesha2_seq_property_bean_free(rms_sequence_bean, env);
-    }
-
-    if(create_seq_msg_ctx)
-    {
-        axis2_msg_ctx_free(create_seq_msg_ctx, env);
-    }
-
-    if(storage_mgr)
-    {
-        sandesha2_storage_mgr_free(storage_mgr, env);
-    }
-    
-    if(create_seq_mgr)
-    {
-        sandesha2_create_seq_mgr_free(create_seq_mgr, env);
-    }
-    
-    if(sender_mgr)
-    {
-        sandesha2_sender_mgr_free(sender_mgr, env);
-    }
-    
-    if(seq_prop_mgr)
-    {
-        sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
-    }
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-        "[sandesha2] Exit:sandesha2_app_msg_processor_create_seq_msg_worker_function");
-    
-    axutil_allocator_switch_to_local_pool(env->allocator);
-
-    return NULL;
-}
-
-static axis2_status_t
-sandesha2_app_msg_processor_start_application_msg_resender(
-    const axutil_env_t *env,
-    axis2_conf_ctx_t *conf_ctx,
-    axis2_char_t *internal_sequence_id,
-    axis2_char_t *msg_id,
-    const axis2_bool_t is_server_side,
-    int retrans_interval,
-    axis2_msg_ctx_t *app_msg_ctx)
-{
-    axutil_thread_t *worker_thread = NULL;
-    sandesha2_app_msg_processor_args_t *args = NULL;
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Entry:sandesha2_app_msg_processor_start_application_msg_resender");
-    
-    args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
-    args->env = axutil_init_thread_env(env);
-    args->conf_ctx = conf_ctx;
-    args->internal_sequence_id = internal_sequence_id;
-    args->msg_id = msg_id;
-    args->retrans_interval = retrans_interval;
-    args->is_server_side = is_server_side;
-    args->msg_ctx = app_msg_ctx;
-
-    worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
-            sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
-    if(!worker_thread)
-    {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_application_msg_resender");
-        return AXIS2_FAILURE;
-    }
-
-    axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
-            "[sandesha2] Exit:sandesha2_app_msg_processor_start_application_msg_resender");
-    return AXIS2_SUCCESS;
-}
-
-static void * AXIS2_THREAD_FUNC
-sandesha2_app_msg_processor_application_msg_worker_function(
-    axutil_thread_t *thd, 
-    void *data)
-{
-    sandesha2_app_msg_processor_args_t *args;
-    axutil_env_t *env = NULL;
-    sandesha2_storage_mgr_t *storage_mgr = NULL;
-    sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
-    sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
-    sandesha2_sender_mgr_t *sender_mgr = NULL;
-    int retrans_interval = 0;
-    axis2_char_t *dbname = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
-    axis2_char_t *internal_sequence_id = NULL;
-    axis2_bool_t is_server_side = AXIS2_FALSE;
-    sandesha2_sender_bean_t *sender_bean = NULL;
-    axis2_char_t *msg_id = NULL;
-    axis2_status_t status = AXIS2_FAILURE;
-    axis2_msg_ctx_t *app_msg_ctx = NULL;
-
-    args = (sandesha2_app_msg_processor_args_t*) data;
-    env = args->env;
-    axutil_allocator_switch_to_global_pool(env->allocator);
-    app_msg_ctx = (axis2_msg_ctx_t *) args->msg_ctx;
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-        "[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
-    conf_ctx = args->conf_ctx;
-    msg_id = args->msg_id;
-    internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
-    is_server_side = args->is_server_side;
-    retrans_interval = args->retrans_interval;
-    dbname = sandesha2_util_get_dbname(env, conf_ctx);
-    storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
-    seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
-    create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
-    sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
-
-    while(AXIS2_TRUE)
-    {
-        AXIS2_SLEEP(retrans_interval);
-        sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env, 
-                internal_sequence_id, msg_id);
-        if(!sender_bean)
-        {
-            /* There is no pending message to send. So exit from the thread. */
-            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                    "[sandesha2] There is no pending message to send. So exit from the thread");
-            break;
-        }
-
-        status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
-                internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr, 
-                sender_mgr, app_msg_ctx);
-
-        if(AXIS2_SUCCESS != status)
-        {
-            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                "[sandesha2] Resend failed for  message id %s in sequence %s", msg_id, 
-                internal_sequence_id);
-
-            if(sender_bean)
-            {
-                sandesha2_sender_bean_free(sender_bean, env); 
-            }
-            break;
-        }
-
-        if(sender_bean)
-        {
-            sandesha2_sender_bean_free(sender_bean, env); 
-        }
-    }
-
-    if(app_msg_ctx)
-    {
-        axis2_msg_ctx_free(app_msg_ctx, env);
-    }
-
-    if(storage_mgr)
-    {
-        sandesha2_storage_mgr_free(storage_mgr, env);
-    }
-    
-    if(create_seq_mgr)
-    {
-        sandesha2_create_seq_mgr_free(create_seq_mgr, env);
-    }
-    
-    if(sender_mgr)
-    {
-        sandesha2_sender_mgr_free(sender_mgr, env);
-    }
-    
-    if(seq_prop_mgr)
-    {
-        sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
-    }
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-        "[sandesha2] Exit:sandesha2_app_msg_processor_application_msg_worker_function");
-    
-    axutil_allocator_switch_to_local_pool(env->allocator);
-    
-    return NULL;
-}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org