You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/06/15 11:11:38 UTC

svn commit: r667935 - in /webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008: include/sandesha2_constants.h src/msgprocessors/ack_msg_processor.c src/msgprocessors/app_msg_processor.c src/util/terminate_mgr.c

Author: damitha
Date: Sun Jun 15 02:11:37 2008
New Revision: 667935

URL: http://svn.apache.org/viewvc?rev=667935&view=rev
Log:
Fixed terminate resending for replay model.

Modified:
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h Sun Jun 15 02:11:37 2008
@@ -290,6 +290,8 @@
     #define SANDESHA2_SEQ_PROP_LAST_IN_MESSAGE_ID  "LastInMessageId"
 
     #define SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_ID "HighestInMsgId"
+    
+    #define SANDESHA2_SEQ_PROP_1_0_REPLAY "Replay1_0"
 	
 	/**
 	 * SOAP Versions

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c Sun Jun 15 02:11:37 2008
@@ -337,6 +337,7 @@
         size = axutil_array_list_size(ack_range_list, env);
     }
 
+    /* Remove application sender beans from database that are acked */
     for(i = 0; i < size; i++)
     {
         sandesha2_ack_range_t *ack_range  = NULL;

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Sun Jun 15 02:11:37 2008
@@ -2172,7 +2172,7 @@
     axis2_char_t *rm_ns_val = NULL;
     sandesha2_msg_number_t *msg_number = NULL;
     axis2_msg_ctx_t *req_msg = NULL;
-    axis2_char_t *str_identifier = NULL;
+    axis2_char_t *rms_sequence_id = NULL;
     sandesha2_sender_bean_t *app_msg_bean = NULL;
     long millisecs = 0;
     axutil_property_t *property = NULL;
@@ -2195,6 +2195,7 @@
     axutil_param_t *sleep_time_param = NULL;
     int sleep_time = 0;
     axis2_conf_t *conf = NULL;
+    const axis2_char_t *mep = NULL;
 
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,   
         "[Sandesha2] Entry:sandesha2_app_msg_processor_send_app_msg");
@@ -2223,6 +2224,13 @@
         AXIS2_SLEEP(1);
     }
 
+    if(rms_sequence_bean)
+    {
+        rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean, 
+                    env));
+        sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+    }
+
     if (to_bean)
     {
         to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(to_bean, env));
@@ -2287,6 +2295,10 @@
         {
             AXIS2_FREE(env->allocator, reply_to_addr);
         }
+        if(rms_sequence_id)
+        {
+            AXIS2_FREE(env->allocator, rms_sequence_id);
+        }
 
         return AXIS2_FAILURE;
     }
@@ -2325,6 +2337,10 @@
             {
                 AXIS2_FREE(env->allocator, reply_to_addr);
             }
+            if(rms_sequence_id)
+            {
+                AXIS2_FREE(env->allocator, rms_sequence_id);
+            }
 
             return AXIS2_FAILURE;
         }
@@ -2365,17 +2381,13 @@
         }
     }
 
-    if(!rms_sequence_bean || !sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
+    if(!rms_sequence_id)
     {
-        str_identifier = SANDESHA2_TEMP_SEQ_ID;
-    }
-    else
-    {
-        str_identifier = sandesha2_seq_property_bean_get_value(rms_sequence_bean, env);
+        rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); /* Why should we do this?:damitha */
     }
         
     identifier = sandesha2_identifier_create(env, rm_ns_val);
-    sandesha2_identifier_set_identifier(identifier, env, str_identifier);
+    sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
     sandesha2_seq_set_identifier(seq, env, identifier);
     sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
 
@@ -2457,6 +2469,10 @@
         {
             AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
+        if(rms_sequence_id)
+        {
+            AXIS2_FREE(env->allocator, rms_sequence_id);
+        }
         
         return status;
     }
@@ -2474,7 +2490,7 @@
     sandesha2_sender_bean_set_msg_no(app_msg_bean, env, msg_num);
     sandesha2_sender_bean_set_msg_type(app_msg_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
 
-    if(!rms_sequence_bean || !sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
+    if(!rms_sequence_id)
     {
         sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_FALSE);
     }
@@ -2485,7 +2501,15 @@
         axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
     }
 
+
     temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
+    if(temp_op_ctx)
+    {
+        axis2_op_t *op = NULL;
+
+        op = axis2_op_ctx_get_op(temp_op_ctx, env);
+        mep = axis2_op_get_msg_exchange_pattern(op, env);
+    }
 
     /**
      * When we store application message context as below it should be noted
@@ -2519,6 +2543,10 @@
         {
             AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
+        if(rms_sequence_id)
+        {
+            AXIS2_FREE(env->allocator, rms_sequence_id);
+        }
 
         return AXIS2_FAILURE;
     }
@@ -2574,6 +2602,19 @@
         axis2_transport_sender_t *transport_sender = NULL;
         sandesha2_sender_bean_t *sender_bean = NULL;
 
+        if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
+        {
+            sandesha2_seq_property_bean_t *replay_bean = NULL;
+
+            replay_bean = sandesha2_seq_property_bean_create_with_data(env, rms_sequence_id, 
+                    SANDESHA2_SEQ_PROP_1_0_REPLAY, NULL);
+            sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, replay_bean);
+            if(replay_bean)
+            {
+                sandesha2_seq_property_bean_free(replay_bean, env);
+            }
+        }
+
         sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env, 
                 internal_sequence_id, msg_id);
         if(!sender_bean)
@@ -2600,6 +2641,9 @@
             {
                 continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_bean, 
                         conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
+                sandesha2_sender_mgr_update(sender_mgr, env, app_msg_bean);
+
                 if(!continue_sending)
                 {
                     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
@@ -2660,6 +2704,10 @@
         {
             AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
+        if(rms_sequence_id)
+        {
+            AXIS2_FREE(env->allocator, rms_sequence_id);
+        }
 
         return status;
     }
@@ -2678,6 +2726,10 @@
     {
         AXIS2_FREE(env->allocator, from_acks_to_addr);
     }
+    if(rms_sequence_id)
+    {
+        AXIS2_FREE(env->allocator, rms_sequence_id);
+    }
 
     /* If not (single channel) spawn a thread and see whether acknowledgment has arrived through the 
      * sandesha2_sender_mgr_get_application_msg_to_send() function. If it has arrived exit from

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c Sun Jun 15 02:11:37 2008
@@ -76,22 +76,6 @@
     const axutil_env_t *env,
     axis2_char_t *name);
 
-static axis2_status_t
-sandesha2_terminate_mgr_resend(
-    const axutil_env_t *env,
-    axis2_conf_ctx_t *conf_ctx,
-    axis2_char_t *msg_id,
-    axis2_bool_t is_svr_side,
-    sandesha2_storage_mgr_t *storage_mgr,
-    sandesha2_seq_property_mgr_t *seq_prop_mgr,
-    sandesha2_create_seq_mgr_t *create_seq_mgr,
-    sandesha2_sender_mgr_t *sender_mgr);
-
-static axis2_bool_t AXIS2_CALL
-sandesha2_terminate_mgr_check_for_response_msg(
-    const axutil_env_t *env, 
-    axis2_msg_ctx_t *msg_ctx);
-
 static axis2_status_t AXIS2_CALL
 sandesha2_terminate_mgr_process_terminate_msg_response(
     const axutil_env_t *env, 
@@ -696,10 +680,6 @@
      */
     find_seq_prop_bean = sandesha2_seq_property_bean_create(env);
     sandesha2_seq_property_bean_set_seq_id(find_seq_prop_bean, env, internal_sequence_id);
-    if(internal_sequence_id)
-    {
-        AXIS2_FREE(env->allocator, internal_sequence_id);
-    }
 
     found_list = sandesha2_seq_property_mgr_find(seq_prop_mgr, env, find_seq_prop_bean);
     if(found_list)
@@ -769,7 +749,7 @@
 AXIS2_EXTERN axis2_status_t AXIS2_CALL
 sandesha2_terminate_mgr_send_terminate_seq_msg(
     const axutil_env_t *env,
-    sandesha2_msg_ctx_t *rm_msg_ctx,
+    sandesha2_msg_ctx_t *ack_rm_msg_ctx,
     axis2_char_t *rms_sequence_id,
     axis2_char_t *internal_sequence_id,
     sandesha2_storage_mgr_t *storage_mgr,
@@ -777,11 +757,11 @@
     sandesha2_create_seq_mgr_t *create_seq_mgr,
     sandesha2_sender_mgr_t *sender_mgr)
 {
-    axis2_msg_ctx_t *msg_ctx = NULL;
+    axis2_msg_ctx_t *ack_msg_ctx = NULL;
     axis2_msg_ctx_t *terminate_msg_ctx = NULL;
     axis2_conf_ctx_t *conf_ctx = NULL;
     sandesha2_seq_property_bean_t *terminated = NULL;
-    sandesha2_msg_ctx_t *terminate_rm_msg = NULL;
+    sandesha2_msg_ctx_t *terminate_rm_msg_ctx = NULL;
     axutil_property_t *property = NULL;
     axis2_endpoint_ref_t *to_epr = NULL;
     sandesha2_seq_property_bean_t *to_bean = NULL;
@@ -790,7 +770,7 @@
     axis2_char_t *key = NULL;
     sandesha2_sender_bean_t *terminate_bean = NULL;
     sandesha2_seq_property_bean_t *terminate_added = NULL;
-    axis2_msg_ctx_t *msg_ctx1 = NULL;
+    sandesha2_seq_property_bean_t *replay_bean = NULL;
     axis2_engine_t *engine = NULL;
     axis2_char_t *temp_action = NULL;
     axutil_string_t *soap_action = NULL;
@@ -806,7 +786,7 @@
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
         "[sandesha2] Entry:sandesha2_terminate_mgr_send_terminate_seq_msg");
 
-    AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
+    AXIS2_PARAM_CHECK(env->error, ack_rm_msg_ctx, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, rms_sequence_id, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, internal_sequence_id, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FAILURE);
@@ -814,8 +794,8 @@
     AXIS2_PARAM_CHECK(env->error, create_seq_mgr, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, sender_mgr, AXIS2_FAILURE);
     
-    msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
-    conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+    ack_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(ack_rm_msg_ctx, env);
+    conf_ctx = axis2_msg_ctx_get_conf_ctx(ack_msg_ctx, env);
     
     terminated = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rms_sequence_id, 
             SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
@@ -832,17 +812,17 @@
         return AXIS2_SUCCESS;
     }
 
-    terminate_rm_msg = sandesha2_msg_creator_create_terminate_seq_msg(env, rm_msg_ctx, 
+    terminate_rm_msg_ctx = sandesha2_msg_creator_create_terminate_seq_msg(env, ack_rm_msg_ctx, 
             rms_sequence_id, internal_sequence_id, seq_prop_mgr);
 
-    if(!terminate_rm_msg)
+    if(!terminate_rm_msg_ctx)
     {
         return AXIS2_FAILURE;
     }
 
-    sandesha2_msg_ctx_set_flow(terminate_rm_msg, env, AXIS2_OUT_FLOW);
+    sandesha2_msg_ctx_set_flow(terminate_rm_msg_ctx, env, AXIS2_OUT_FLOW);
     property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
-    sandesha2_msg_ctx_set_property(terminate_rm_msg, env, SANDESHA2_APPLICATION_PROCESSING_DONE, 
+    sandesha2_msg_ctx_set_property(terminate_rm_msg_ctx, env, SANDESHA2_APPLICATION_PROCESSING_DONE, 
             property);
     
     to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
@@ -859,20 +839,25 @@
         to_addr = axis2_endpoint_ref_get_address(to_epr, env);
     }
 
-    sandesha2_msg_ctx_set_to(terminate_rm_msg, env, to_epr);
+    sandesha2_msg_ctx_set_to(terminate_rm_msg_ctx, env, to_epr);
     rm_ver = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
     if(!rm_ver)
     {
         AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Cannot find the rm version for msg");
+        if(terminate_rm_msg_ctx)
+        {
+            sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
+        }
+
         return AXIS2_FAILURE;
     }
 
-    sandesha2_msg_ctx_set_wsa_action(terminate_rm_msg, env, 
+    sandesha2_msg_ctx_set_wsa_action(terminate_rm_msg_ctx, env, 
         sandesha2_spec_specific_consts_get_terminate_seq_action(env, rm_ver));
 
     temp_action = sandesha2_spec_specific_consts_get_terminate_seq_soap_action(env, rm_ver);
     soap_action = axutil_string_create(env, temp_action);
-    sandesha2_msg_ctx_set_soap_action(terminate_rm_msg, env, soap_action);
+    sandesha2_msg_ctx_set_soap_action(terminate_rm_msg_ctx, env, soap_action);
     transport_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
             SANDESHA2_SEQ_PROP_TRANSPORT_TO);
 
@@ -880,28 +865,25 @@
     {
         axis2_char_t *value = sandesha2_seq_property_bean_get_value(transport_to_bean, env);
         property = axutil_property_create_with_args(env, 0, 0, 0, value);
-        sandesha2_msg_ctx_set_property(terminate_rm_msg, env, AXIS2_TRANSPORT_URL, property);
+        sandesha2_msg_ctx_set_property(terminate_rm_msg_ctx, env, AXIS2_TRANSPORT_URL, property);
     }
     
-    sandesha2_msg_ctx_add_soap_envelope(terminate_rm_msg, env);
-    /* If server side and single channel duplex mode send the terminate sequence
-     * message.
+    sandesha2_msg_ctx_add_soap_envelope(terminate_rm_msg_ctx, env);
+    
+    terminate_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg_ctx, env);
+
+    /* If server side and single channel duplex mode send the terminate sequence message.
      */
-    if(axis2_msg_ctx_get_server_side(msg_ctx, env) && sandesha2_utils_is_rm_1_0_anonymous_acks_to(
-                env, rm_ver, to_addr))
+    if(sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_ver, to_addr))
     {
-        axis2_msg_ctx_t *msg_ctx2 = NULL;
-
-        msg_ctx2 = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
-        is_svr_side = axis2_msg_ctx_get_server_side(msg_ctx2, env);
-        axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(msg_ctx2, env), env, AXIS2_TRUE);
-        axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
-        axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(msg_ctx, env), env, AXIS2_TRUE);
+        axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(terminate_msg_ctx, env), env, AXIS2_TRUE);
+        axis2_msg_ctx_set_paused(ack_msg_ctx, env, AXIS2_TRUE);
+        axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(ack_msg_ctx, env), env, AXIS2_TRUE);
         engine = axis2_engine_create(env, conf_ctx);
 
         AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] axis2_engine_send");
 
-        axis2_engine_send(engine, env, msg_ctx2);
+        axis2_engine_send(engine, env, terminate_msg_ctx);
         if(engine)
         {
             axis2_engine_free(engine, env);
@@ -909,22 +891,20 @@
         }
 
         /* Clean sending side data */
-        {
-            sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id, 
-                    is_svr_side, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
-            terminate_added = sandesha2_seq_property_bean_create(env);
-
-            sandesha2_seq_property_bean_set_name(terminate_added, env, 
-                    SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
-
-            sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
-            sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
-            sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
+        sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id, 
+                is_svr_side, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+        terminate_added = sandesha2_seq_property_bean_create(env);
 
-            if(terminate_added)
-            {
-                sandesha2_seq_property_bean_free(terminate_added, env);
-            }
+        sandesha2_seq_property_bean_set_name(terminate_added, env, 
+                SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
+
+        sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
+        sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
+        sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
+
+        if(terminate_added)
+        {
+            sandesha2_seq_property_bean_free(terminate_added, env);
         }
 
         if(rm_ver)
@@ -932,6 +912,11 @@
             AXIS2_FREE(env->allocator, rm_ver);
         }
 
+        if(terminate_rm_msg_ctx)
+        {
+            sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
+        }
+
         return AXIS2_SUCCESS;
     }
 
@@ -943,14 +928,13 @@
     key = axutil_uuid_gen(env);
     terminate_bean = sandesha2_sender_bean_create(env);
     sandesha2_sender_bean_set_msg_ctx_ref_key(terminate_bean, env, key);
-    terminate_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
     sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, key, terminate_msg_ctx);
     property_bean = sandesha2_utils_get_property_bean(env, axis2_conf_ctx_get_conf(conf_ctx, env));
     terminate_delay = sandesha2_property_bean_get_terminate_delay(property_bean, env); 
     send_time = sandesha2_utils_get_current_time_in_millis(env) + terminate_delay;
     sandesha2_sender_bean_set_time_to_send(terminate_bean, env, send_time);
 
-    msg_id = sandesha2_msg_ctx_get_msg_id(terminate_rm_msg, env);
+    msg_id = sandesha2_msg_ctx_get_msg_id(terminate_rm_msg_ctx, env);
     sandesha2_sender_bean_set_msg_id(terminate_bean, env, msg_id);
 
     sandesha2_sender_bean_set_send(terminate_bean, env, AXIS2_TRUE);
@@ -962,304 +946,158 @@
                             
     sandesha2_sender_bean_set_resend(terminate_bean, env, AXIS2_FALSE);
     sandesha2_sender_mgr_insert(sender_mgr, env, terminate_bean);
-    if(terminate_bean)
-    {
-        sandesha2_sender_bean_free(terminate_bean, env);
-    }
     
-    terminate_added = sandesha2_seq_property_bean_create(env);
-    sandesha2_seq_property_bean_set_name(terminate_added, env, SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
-    sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
-    sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
-    sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
-
-    if(terminate_added)
-    {
-        sandesha2_seq_property_bean_free(terminate_added, env);
-    }
-
-    msg_ctx1 = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
-
     property = axutil_property_create_with_args(env, 0, AXIS2_TRUE, 0, key);
-    axis2_msg_ctx_set_property(msg_ctx1, env, SANDESHA2_MESSAGE_STORE_KEY, property);
+    axis2_msg_ctx_set_property(terminate_msg_ctx, env, SANDESHA2_MESSAGE_STORE_KEY, property);
                         
     property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
-    axis2_msg_ctx_set_property(msg_ctx1, env, SANDESHA2_SET_SEND_TO_TRUE, property);
+    axis2_msg_ctx_set_property(terminate_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
                         
-    reply_to_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+    reply_to_epr = axis2_msg_ctx_get_to(ack_msg_ctx, env);
     if(reply_to_epr)
     {
-        axis2_msg_ctx_set_reply_to(msg_ctx1, env, reply_to_epr);
+        axis2_msg_ctx_set_reply_to(terminate_msg_ctx, env, reply_to_epr);
     }
 
-    if(!sandesha2_util_is_ack_already_piggybacked(env, terminate_rm_msg))
+    if(!sandesha2_util_is_ack_already_piggybacked(env, terminate_rm_msg_ctx))
     {
-        sandesha2_ack_mgr_piggyback_acks_if_present(env, terminate_rm_msg, storage_mgr, seq_prop_mgr, 
+        sandesha2_ack_mgr_piggyback_acks_if_present(env, terminate_rm_msg_ctx, storage_mgr, seq_prop_mgr, 
                 sender_mgr);
     }
 
-    is_svr_side = sandesha2_msg_ctx_get_server_side(rm_msg_ctx, env);
+    is_svr_side = sandesha2_msg_ctx_get_server_side(ack_rm_msg_ctx, env); /* Do we need this?:damitha */
     engine = axis2_engine_create(env, conf_ctx);
 
-    if(AXIS2_SUCCESS == axis2_engine_send(engine, env, msg_ctx1))
+    replay_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+        rms_sequence_id, SANDESHA2_SEQ_PROP_1_0_REPLAY);
+    if(replay_bean)
+    {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] replay on");
+    }
+    if(AXIS2_SUCCESS == axis2_engine_send(engine, env, terminate_msg_ctx))
     {
-        if(sandesha2_terminate_mgr_check_for_response_msg(env, msg_ctx1))
+        if(replay_bean)
         {
             axiom_soap_envelope_t *res_envelope = NULL;
             axis2_char_t *soap_ns_uri = NULL;
-
-            soap_ns_uri = axis2_msg_ctx_get_is_soap_11(msg_ctx1, env) ?
+            axis2_transport_out_desc_t *transport_out = NULL;
+            axis2_transport_sender_t *transport_sender = NULL;
+            
+            sandesha2_sender_bean_set_resend(terminate_bean, env, AXIS2_TRUE);
+            soap_ns_uri = axis2_msg_ctx_get_is_soap_11(terminate_msg_ctx, env) ?
                  AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
                  AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
 
-            res_envelope = axis2_msg_ctx_get_response_soap_envelope(msg_ctx1, env);
+            res_envelope = axis2_msg_ctx_get_response_soap_envelope(terminate_msg_ctx, env);
             if(!res_envelope)
             {
                 AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Response envelope not found");
 
                 res_envelope = (axiom_soap_envelope_t *) axis2_http_transport_utils_create_soap_msg(env, 
-                        msg_ctx1, soap_ns_uri);
+                        terminate_msg_ctx, soap_ns_uri);
             }
 
             if(res_envelope)
             {
-                status = sandesha2_terminate_mgr_process_terminate_msg_response(env, msg_ctx1, storage_mgr);
+                status = sandesha2_terminate_mgr_process_terminate_msg_response(env, terminate_msg_ctx, storage_mgr);
                 if(AXIS2_SUCCESS != status)
                 {
                     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
                         "[sandesha2] Terminate message response process failed for sequence %s", 
                         internal_sequence_id);
-
-                    if(engine)
-                    {
-                        axis2_engine_free(engine, env);
-                    }
-                    if(terminate_rm_msg)
-                    {
-                        sandesha2_msg_ctx_free(terminate_rm_msg, env);
-                    }
-
-                    return  status;
                 }
             }
+            transport_out = axis2_msg_ctx_get_transport_out_desc(terminate_msg_ctx, env);
+            if(transport_out)
+            {
+                transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+            }
 
             while(!res_envelope)
             {
+                axis2_bool_t continue_sending = AXIS2_FALSE;
                 long retrans_delay = -1;
 
-                retrans_delay = sandesha2_property_bean_get_retrans_interval(property_bean, env); 
-                AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "retrans_delay:%ld", retrans_delay);
-                AXIS2_SLEEP(retrans_delay);
+                continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, terminate_bean, 
+                        conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
 
-                status = sandesha2_terminate_mgr_resend(env, conf_ctx, msg_id, is_svr_side, 
-                        storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+                sandesha2_sender_mgr_update(sender_mgr, env, terminate_bean);
 
-                if(AXIS2_SUCCESS != status)
+                if(!continue_sending)
                 {
                     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                        "[sandesha2] Resend failed for message id %s in sequence %s", msg_id, 
-                        internal_sequence_id);
+                            "[sandesha2] Do not continue sending the terminate sequence message");
                     break;
                 }
-            }
-        }
-    }
 
-    if(engine)
-    {
-        axis2_engine_free(engine, env);
-    }
-
-    if(terminate_rm_msg)
-    {
-        sandesha2_msg_ctx_free(terminate_rm_msg, env);
-    }
-
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Exit:sandesha2_terminate_mgr_send_terminate_seq_msg");
+                retrans_delay = sandesha2_property_bean_get_retrans_interval(property_bean, env); 
+                AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "retrans_delay:%ld", retrans_delay);
+                AXIS2_SLEEP(retrans_delay);
 
-    return status;
-}
+                if(transport_sender)
+                {
+                    /* This is neccessary to avoid a double free */
+                    axis2_msg_ctx_set_property(terminate_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+                    if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, terminate_msg_ctx))
+                    {
+                        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                                "[sandesha2] Transport sender invoke failed in sending terminate sequence message");
+                    }
+                }
 
-static axis2_status_t
-sandesha2_terminate_mgr_resend(
-    const axutil_env_t *env,
-    axis2_conf_ctx_t *conf_ctx,
-    axis2_char_t *msg_id,
-    axis2_bool_t is_svr_side,
-    sandesha2_storage_mgr_t *storage_mgr,
-    sandesha2_seq_property_mgr_t *seq_prop_mgr,
-    sandesha2_create_seq_mgr_t *create_seq_mgr,
-    sandesha2_sender_mgr_t *sender_mgr)
-{
-    sandesha2_sender_bean_t *sender_worker_bean = NULL;
-    sandesha2_sender_bean_t *bean1 = NULL;
-    axis2_char_t *key = NULL;
-    axis2_bool_t continue_sending = AXIS2_TRUE;
-    axis2_msg_ctx_t *msg_ctx = NULL;
-    sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
-    axis2_transport_out_desc_t *transport_out = NULL;
-    axis2_transport_sender_t *transport_sender = NULL;
-    axis2_bool_t successfully_sent = AXIS2_FALSE;
-    axis2_status_t status = AXIS2_SUCCESS;
-    axis2_bool_t resend = AXIS2_FALSE;
+                status = sandesha2_terminate_mgr_process_terminate_msg_response(env, terminate_msg_ctx, storage_mgr);
+                if(AXIS2_SUCCESS != status)
+                {
+                    AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                        "[sandesha2] Terminate message response process failed for sequence %s", 
+                        internal_sequence_id);
 
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_terminate_mgr_resend");        
-    
-    sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
-    if(!sender_worker_bean)
-    {
-        AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
-        return AXIS2_FAILURE;
-    }
+                    break;
+                }
 
-    key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_worker_bean, env);
-    if(is_svr_side)
-    {
-        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Retrieving msg_ctx from database");
-        msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx, AXIS2_TRUE);
-    }
-    else
-    {
-        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                "[sandesha2] Retrieving msg_ctx from configuration context");
-        msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx, 
-                AXIS2_FALSE);
-    }
+                res_envelope = axis2_msg_ctx_get_response_soap_envelope(terminate_msg_ctx, env);
+            }
 
-    if(!msg_ctx)
-    {
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is not present in the store.");
-        if(sender_worker_bean)
-        {
-            sandesha2_sender_bean_free(sender_worker_bean, env);
+            sandesha2_seq_property_bean_free(replay_bean, env);
         }
-
-        return AXIS2_FAILURE;
     }
 
-    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean, 
-            conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
-    sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
-    if(!continue_sending)
-    {
-        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Do not continue sending the message");
-        if(sender_worker_bean)
-        {
-            sandesha2_sender_bean_free(sender_worker_bean, env);
-        }
-
-        return AXIS2_FAILURE;
-    }
-    
-    rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
-    
-    if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
-    {
-        sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx, storage_mgr, seq_prop_mgr, 
-                sender_mgr);
-    }
-    
-    transport_out = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
-    if(transport_out)
-    {
-        transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
-    }
-    if(transport_sender)
+    if(terminate_bean)
     {
-        /* This is neccessary to avoid a double free */
-        axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
-        if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
-		{
-        	successfully_sent = AXIS2_TRUE;
-		}else
-		{
-        	successfully_sent = AXIS2_FALSE;
-		}
+        sandesha2_sender_bean_free(terminate_bean, env);
     }
 
-    msg_id = sandesha2_sender_bean_get_msg_id(sender_worker_bean, env);
-    bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
-    if(bean1)
-    { 
-        resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
-        if(resend)
-        {
-            sandesha2_sender_bean_set_sent_count(bean1, env, 
-                sandesha2_sender_bean_get_sent_count(sender_worker_bean, env));
-            sandesha2_sender_bean_set_time_to_send(bean1, env, 
-                sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
-            sandesha2_sender_mgr_update(sender_mgr, env, bean1);
-        }
-    }
+    sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id, is_svr_side, 
+            storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+    terminate_added = sandesha2_seq_property_bean_create(env);
 
-    if(sender_worker_bean)
-    {
-        sandesha2_sender_bean_free(sender_worker_bean, env);
-    }
+    sandesha2_seq_property_bean_set_name(terminate_added, env, SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
 
-    if(successfully_sent)
-    {
-        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-            "[sandesha2] Message successfully sent");
-        if(sandesha2_terminate_mgr_check_for_response_msg(env, msg_ctx))
-        {
-            status = sandesha2_terminate_mgr_process_terminate_msg_response(env, msg_ctx, storage_mgr);
-            if(AXIS2_SUCCESS != status)
-            {
-                return status;
-            }
-        }
-    }
+    sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
+    sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
+    sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
 
-    if(bean1)
+    if(terminate_added)
     {
-        sandesha2_sender_bean_free(bean1, env);
+        sandesha2_seq_property_bean_free(terminate_added, env);
     }
 
-    if(rm_msg_ctx)
+    if(engine)
     {
-        sandesha2_msg_ctx_free(rm_msg_ctx, env);
+        axis2_engine_free(engine, env);
     }
 
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_terminate_mgr_resend");
-
-    return status;
-}
-
-static axis2_bool_t AXIS2_CALL
-sandesha2_terminate_mgr_check_for_response_msg(
-    const axutil_env_t *env, 
-    axis2_msg_ctx_t *msg_ctx)
-{
-    axis2_bool_t svr_side = AXIS2_FALSE;
-    axis2_char_t *soap_ns_uri = NULL;
-   
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
-        "[sandesha2] Entry:sandesha2_terminate_mgr_check_for_response_msg");
-    
-    AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
-    
-    svr_side = axis2_msg_ctx_get_server_side(msg_ctx, env);
-    soap_ns_uri = axis2_msg_ctx_get_is_soap_11(msg_ctx, env) ?
-         AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
-         AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
-
-    if(svr_side)
+    if(terminate_rm_msg_ctx)
     {
-        /* We check and process the sync response only in the application client 
-         * side.
-         */
-        return AXIS2_FALSE;
+        sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
     }
 
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
-        "[sandesha2] Exit:sandesha2_terminate_mgr_check_for_response_msg");
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Exit:sandesha2_terminate_mgr_send_terminate_seq_msg");
 
-    return AXIS2_TRUE;
+    return status;
 }
 
-
 static axis2_status_t AXIS2_CALL
 sandesha2_terminate_mgr_process_terminate_msg_response(
     const axutil_env_t *env, 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org