You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/10/09 18:57:56 UTC

svn commit: r703207 - in /webservices/sandesha/trunk/c/src: msgprocessors/app_msg_processor.c transport/sandesha2_transport_sender.c

Author: damitha
Date: Thu Oct  9 09:57:56 2008
New Revision: 703207

URL: http://svn.apache.org/viewvc?rev=703207&view=rev
Log:
Now Sandesha2/C store the application message in the database before sending it when it is dual channel.

Modified:
    webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
    webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c

Modified: webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c?rev=703207&r1=703206&r2=703207&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c Thu Oct  9 09:57:56 2008
@@ -89,6 +89,7 @@
     int retrans_interval;
     void *bean;
     void *msg_ctx;
+    sandesha2_seq_t *sequence;
 };
 
 static void AXIS2_CALL                 
@@ -100,12 +101,12 @@
     long msg_num,
     sandesha2_seq_property_mgr_t *seq_prop_mgr);
 
-/*static void * AXIS2_THREAD_FUNC
+static void * AXIS2_THREAD_FUNC
 sandesha2_app_msg_processor_create_seq_msg_worker_function(
     axutil_thread_t *thd, 
-    void *data);*/
+    void *data);
 
-/*static axis2_status_t
+static axis2_status_t
 sandesha2_app_msg_processor_start_create_seq_msg_resender(
     const axutil_env_t *env,
     axis2_conf_ctx_t *conf_ctx,
@@ -114,7 +115,7 @@
     const axis2_bool_t is_server_side,
     int retrans_interval,
     sandesha2_sender_bean_t *create_sequence_sender_bean,
-    axis2_msg_ctx_t *create_seq_msg_ctx);*/
+    axis2_msg_ctx_t *create_seq_msg_ctx);
 
 static void * AXIS2_THREAD_FUNC
 sandesha2_app_msg_processor_application_msg_worker_function(
@@ -129,7 +130,8 @@
     axis2_char_t *msg_id,
     const axis2_bool_t is_server_side,
     int retrans_interval,
-    axis2_msg_ctx_t *app_msg_ctx);
+    axis2_msg_ctx_t *app_msg_ctx,
+    sandesha2_seq_t *sequence);
 
 static axis2_status_t AXIS2_CALL 
 sandesha2_app_msg_processor_process_in_msg (
@@ -1598,6 +1600,8 @@
 
             if(AXIS2_SUCCESS != status)
             {
+                /* Pause the message contex so that it won't be sent at transport sender */
+                axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
                 AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                     "[sandesha2] Could not send create sequence message");
                 
@@ -2375,8 +2379,8 @@
      * 3. reply_to_addr is anonymous
      * go into the following loop.
      */
-    /*if(!is_svr_side && (listener_manager || !reply_to_addr || sandesha2_utils_is_anon_uri(env, 
-                    reply_to_addr)))*/
+    if(!is_svr_side && (listener_manager || !reply_to_addr || sandesha2_utils_is_anon_uri(env, 
+                    reply_to_addr)))
     {
         if(axis2_engine_send(engine, env, create_seq_msg_ctx))
         {
@@ -2459,17 +2463,25 @@
             axis2_msg_ctx_free(create_seq_msg_ctx, env);
         }
     }
-    /*else
+    else
     {
-        if(axis2_engine_send(engine, env, create_seq_msg_ctx))
-        {
-            if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
-            {
-                status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx, 
-                        storage_mgr);
-            }
-        }
-        else
+        /* This is actually a trick that get the msg_ctx traversed through all the out phases.
+         * Once all the phases are passed it will get hit into the false sandesha2 transport
+         * sender which just reset the original transport sender back.
+         */
+
+        axutil_property_t *property = NULL;
+        axis2_transport_out_desc_t *orig_transport_out = NULL;
+        axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
+
+        orig_transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
+        property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
+        axis2_msg_ctx_set_property(create_seq_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC, 
+                property);
+        sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
+        axis2_msg_ctx_set_transport_out_desc(create_seq_msg_ctx, env, sandesha2_transport_out);
+
+        if(!axis2_engine_send(engine, env, create_seq_msg_ctx))
         {
             AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Engine Send failed");
         }
@@ -2479,12 +2491,13 @@
             axis2_engine_free(engine, env);
         }
 
-        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
-            SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
-        // Dual channel 
-        sandesha2_app_msg_processor_start_create_seq_msg_resender(env, conf_ctx, internal_sequence_id, 
-            msg_id, is_svr_side, retrans_interval, create_sequence_sender_bean, create_seq_msg_ctx);
-    }*/
+        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+        /* Dual channel */
+        status = sandesha2_app_msg_processor_start_create_seq_msg_resender(env, conf_ctx, 
+                internal_sequence_id, msg_id, is_svr_side, retrans_interval, 
+                create_sequence_sender_bean, create_seq_msg_ctx);
+    }
 
     if(rm_version)
     {
@@ -2497,7 +2510,7 @@
     return status;
 }
 
-/*static axis2_status_t
+static axis2_status_t
 sandesha2_app_msg_processor_start_create_seq_msg_resender(
     const axutil_env_t *env,
     axis2_conf_ctx_t *conf_ctx,
@@ -2538,9 +2551,9 @@
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
             "[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
     return AXIS2_SUCCESS;
-}*/
+}
 
-/*static void * AXIS2_THREAD_FUNC
+static void * AXIS2_THREAD_FUNC
 sandesha2_app_msg_processor_create_seq_msg_worker_function(
     axutil_thread_t *thd, 
     void *data)
@@ -2558,7 +2571,7 @@
     axis2_bool_t is_server_side = AXIS2_FALSE;
     sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
     axis2_char_t *msg_id = NULL;
-    // sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+    /* sandesha2_seq_property_bean_t *rms_sequence_bean = NULL; */
     axis2_bool_t continue_sending = AXIS2_TRUE;
     axis2_transport_out_desc_t *transport_out = NULL;
     axis2_transport_sender_t *transport_sender = NULL;
@@ -2589,8 +2602,6 @@
     create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
     sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
 
-    AXIS2_SLEEP(retrans_interval);
-
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "dam_internal_sequence_id:%s", internal_sequence_id);
 
     find_sender_bean = sandesha2_sender_bean_create(env);
@@ -2636,11 +2647,9 @@
             break;
         }
 
-        AXIS2_SLEEP(retrans_interval);
-
         if(transport_sender)
         {
-            // This is neccessary to avoid a double free
+            /* This is neccessary to avoid a double free */
             axis2_msg_ctx_set_property(create_seq_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
             if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
             {
@@ -2648,6 +2657,7 @@
             }
         }
 
+        AXIS2_SLEEP(retrans_interval);
         sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
     }
 
@@ -2687,7 +2697,7 @@
     axutil_allocator_switch_to_local_pool(env->allocator);
 
     return NULL;
-}*/
+}
 
 static axis2_status_t AXIS2_CALL
 sandesha2_app_msg_processor_process_create_seq_response(
@@ -2794,7 +2804,7 @@
     sandesha2_seq_property_bean_t *to_bean = NULL;
     sandesha2_seq_property_bean_t *reply_to_bean = NULL;
     sandesha2_seq_property_bean_t *from_acks_to_bean = NULL;
-    sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+    /*sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;*/
     axis2_endpoint_ref_t *to_epr = NULL;
     axis2_endpoint_ref_t *reply_to_epr = NULL;
     axis2_char_t *from_acks_to_addr = NULL;
@@ -2807,12 +2817,12 @@
     axis2_char_t *rm_ns_val = NULL;
     sandesha2_msg_number_t *msg_number = NULL;
     axis2_msg_ctx_t *req_msg = NULL;
-    axis2_char_t *rms_sequence_id = NULL;
+    /*axis2_char_t *rms_sequence_id = NULL;*/
     sandesha2_sender_bean_t *app_msg_sender_bean = NULL;
     long millisecs = 0;
     /*axutil_property_t *property = NULL;*/
     axis2_engine_t *engine = NULL;
-    sandesha2_identifier_t *identifier = NULL;
+    /*sandesha2_identifier_t *identifier = NULL;*/
     axis2_char_t *msg_id = NULL;
     axis2_bool_t last_msg = AXIS2_FALSE;
     axis2_op_ctx_t *temp_op_ctx = NULL;
@@ -2828,7 +2838,7 @@
     axis2_conf_t *conf = NULL;
     const axis2_char_t *mep = NULL;
     axis2_relates_to_t *relates_to = NULL;
-    sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+    /*sandesha2_seq_property_bean_t *relates_to_bean = NULL;*/
     axis2_svc_t *svc = NULL;
     sandesha2_property_bean_t *property_bean = NULL;
 
@@ -2877,7 +2887,7 @@
     reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id, 
             SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
     
-    rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+    /*rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
             internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
 
     while(!rms_sequence_bean)
@@ -2892,7 +2902,7 @@
         rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean, 
                     env));
         sandesha2_seq_property_bean_free(rms_sequence_bean, env);
-    }
+    }*/
 
     if (to_bean)
     {
@@ -2960,10 +2970,10 @@
             AXIS2_FREE(env->allocator, reply_to_addr);
         }
 
-        if(rms_sequence_id)
+        /*if(rms_sequence_id)
         {
             AXIS2_FREE(env->allocator, rms_sequence_id);
-        }
+        }*/
 
         return AXIS2_FAILURE;
     }
@@ -3003,10 +3013,10 @@
                 AXIS2_FREE(env->allocator, reply_to_addr);
             }
 
-            if(rms_sequence_id)
+            /*if(rms_sequence_id)
             {
                 AXIS2_FREE(env->allocator, rms_sequence_id);
-            }
+            }*/
 
             return AXIS2_FAILURE;
         }
@@ -3047,19 +3057,19 @@
         }
     }
 
-    if(!rms_sequence_id)
+    /*if(!rms_sequence_id)
     {
-        rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); /* Why should we do this?:damitha */
-    }
+        rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); // Why should we do this?:damitha
+    }*/
         
-    identifier = sandesha2_identifier_create(env, rm_ns_val);
+    /*identifier = sandesha2_identifier_create(env, rm_ns_val);
     sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
     sandesha2_seq_set_identifier(seq, env, identifier);
-    sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
+    sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);*/
 
     /* TODO add_ack_requested */
 
-    sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+    /*sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);*/
    
     op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
     if(op_ctx)
@@ -3110,17 +3120,16 @@
     sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
     sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
     sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
+    sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
 
-    if(!rms_sequence_id)
+    /*if(!rms_sequence_id)
     {
         sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
     }
     else
     {
         sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
-        /*property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
-        axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);*/
-    }
+    }*/
 
     sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
 
@@ -3136,6 +3145,36 @@
     if(is_svr_side && sandesha2_utils_is_anon_uri(env, from_acks_to_addr) && (!to_addr || 
             sandesha2_utils_is_anon_uri(env, to_addr)))
     {
+        sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+        axis2_char_t *rms_sequence_id = NULL;
+        sandesha2_identifier_t *identifier = NULL;
+
+        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+        while(!rms_sequence_bean)
+        {
+            rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                    internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+            AXIS2_SLEEP(1);
+        }
+
+        if(rms_sequence_bean)
+        {
+            rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean, 
+                        env));
+            sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+        }
+
+        identifier = sandesha2_identifier_create(env, rm_ns_val);
+        sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+        sandesha2_seq_set_identifier(seq, env, identifier);
+        sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
+
+        /* TODO add_ack_requested */
+
+        sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+
         sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
         if(req_rm_msg_ctx)
         {
@@ -3222,10 +3261,10 @@
             AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
 
-        if(rms_sequence_id)
+        /*if(rms_sequence_id)
         {
             AXIS2_FREE(env->allocator, rms_sequence_id);
-        }
+        }*/
 
         return AXIS2_FAILURE;
     }
@@ -3233,13 +3272,8 @@
     axis2_msg_ctx_set_current_handler_index(app_msg_ctx, env, 
             axis2_msg_ctx_get_current_handler_index(app_msg_ctx, env) + 1);
 
-    if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
-    {
-        sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr, 
-                seq_prop_mgr, sender_mgr);
-    }
 
-    if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
+    /*if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
     {
         if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
         {
@@ -3253,26 +3287,83 @@
                 sandesha2_seq_property_bean_free(replay_bean, env);
             }
         }
-    }
+    }*/
 
     conf = axis2_conf_ctx_get_conf(conf_ctx, env);
     retrans_interval = sandesha2_property_bean_get_retrans_interval(property_bean, env);
 
-    relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id, 
+    /*relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id, 
             SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
     if(relates_to_bean)
     {
         sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
         sandesha2_seq_property_bean_free(relates_to_bean, env);
-    }
+    }*/
 
-    if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
+    if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_anon_uri(env, reply_to_addr)))
     {
         /* Client side and oneway */
         axis2_transport_out_desc_t *transport_out = NULL;
         axis2_transport_sender_t *transport_sender = NULL;
         sandesha2_sender_bean_t *sender_bean = NULL;
+        sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+        axis2_char_t *rms_sequence_id = NULL;
+        sandesha2_identifier_t *identifier = NULL;
+        sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+        
+        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+        while(!rms_sequence_bean)
+        {
+            rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                    internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+            AXIS2_SLEEP(1);
+        }
+
+        if(rms_sequence_bean)
+        {
+            rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean, 
+                        env));
+            sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+        }
+
+        relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id, 
+                SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
+        if(relates_to_bean)
+        {
+            sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
+            sandesha2_seq_property_bean_free(relates_to_bean, env);
+        }
+
+        if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
+        {
+            sandesha2_seq_property_bean_t *replay_bean = NULL;
+
+            replay_bean = sandesha2_seq_property_bean_create_with_data(env, rms_sequence_id, 
+                    SANDESHA2_SEQ_PROP_1_0_REPLAY, NULL);
+            sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, replay_bean);
+            if(replay_bean)
+            {
+                sandesha2_seq_property_bean_free(replay_bean, env);
+            }
+        }
+
+        if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
+        {
+            sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr, 
+                    seq_prop_mgr, sender_mgr);
+        }
+
+        identifier = sandesha2_identifier_create(env, rm_ns_val);
+        sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+        sandesha2_seq_set_identifier(seq, env, identifier);
+        sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
         
+        /* TODO add_ack_requested */
+
+        sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+
         engine = axis2_engine_create(env, conf_ctx);
         if(axis2_engine_resume_send(engine, env, app_msg_ctx))
         {
@@ -3393,16 +3484,32 @@
     }
     else /* Not client side or twoway client side*/
     {
+        /* This is actually a trick that get the msg_ctx traversed through all the out phases.
+         * Once all the phases are passed it will get hit into the false sandesha2 transport
+         * sender which just reset the original transport sender back.
+         */
+
+        axutil_property_t *property = NULL;
+        axis2_transport_out_desc_t *orig_transport_out = NULL;
+        axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
+
+        orig_transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
+        property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
+        axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC, 
+                property);
+        sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
+        axis2_msg_ctx_set_transport_out_desc(app_msg_ctx, env, sandesha2_transport_out);
+
         axis2_msg_ctx_increment_ref(app_msg_ctx, env);
         engine = axis2_engine_create(env, conf_ctx);
-        if(axis2_engine_resume_send(engine, env, app_msg_ctx))
-        {
+        if(!axis2_engine_resume_send(engine, env, app_msg_ctx))
+        /*{
             if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
             {    
                 status = sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
             }
         }
-        else
+        else*/
         {
             AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] Engine resume send failed");
         }
@@ -3417,8 +3524,8 @@
         /* If not (single channel) spawn a thread and see whether acknowledgment has arrived through the 
          * sandesha2_sender_mgr_get_application_msg_to_send() function. If it has arrived exit from
          * the thread.*/
-        sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx, internal_sequence_id, 
-                msg_id, is_svr_side, retrans_interval, app_msg_ctx);
+        status = sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx, 
+                internal_sequence_id, msg_id, is_svr_side, retrans_interval, app_msg_ctx, seq);
     }
    
     if(rm_version)
@@ -3435,10 +3542,10 @@
     {
         AXIS2_FREE(env->allocator, from_acks_to_addr);
     }
-    if(rms_sequence_id)
+    /*if(rms_sequence_id)
     {
         AXIS2_FREE(env->allocator, rms_sequence_id);
-    }
+    }*/
 
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2] Exit:sandesha2_app_msg_processor_send_app_msg");
 
@@ -3453,7 +3560,8 @@
     axis2_char_t *msg_id,
     const axis2_bool_t is_server_side,
     int retrans_interval,
-    axis2_msg_ctx_t *app_msg_ctx)
+    axis2_msg_ctx_t *app_msg_ctx,
+    sandesha2_seq_t *sequence)
 {
     axutil_thread_t *worker_thread = NULL;
     sandesha2_app_msg_processor_args_t *args = NULL;
@@ -3469,6 +3577,7 @@
     args->retrans_interval = retrans_interval;
     args->is_server_side = is_server_side;
     args->msg_ctx = app_msg_ctx;
+    args->sequence = sequence;
 
     worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
             sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
@@ -3505,6 +3614,17 @@
     sandesha2_sender_bean_t *sender_bean = NULL;
     axis2_char_t *msg_id = NULL;
     axis2_status_t status = AXIS2_FAILURE;
+    axis2_svc_t *svc = NULL;
+    axis2_char_t *key = NULL;
+    axis2_msg_ctx_t *app_msg_ctx = NULL;
+    sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+    axis2_char_t *rms_sequence_id = NULL;
+    sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
+    sandesha2_identifier_t *identifier = NULL;
+    sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+    axis2_char_t *rm_version = NULL;
+    axis2_char_t *rm_ns_val = NULL;
+    sandesha2_seq_t *sequence = NULL;
 
     args = (sandesha2_app_msg_processor_args_t*) data;
     env = args->env;
@@ -3513,6 +3633,7 @@
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
         "[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
     conf_ctx = args->conf_ctx;
+    sequence = args->sequence;
     msg_id = axutil_strdup(env, args->msg_id);
     internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
     is_server_side = args->is_server_side;
@@ -3523,7 +3644,6 @@
     create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
     sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
 
-    AXIS2_SLEEP(retrans_interval);
     sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env, 
             internal_sequence_id, msg_id);
     if(!sender_bean)
@@ -3534,12 +3654,91 @@
         return NULL;
     }
 
+    rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+            internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+    key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+    app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx, 
+        AXIS2_TRUE);
+    svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
+
+    while(!rms_sequence_bean)
+    {
+        axis2_bool_t continue_sending = AXIS2_TRUE;
+
+        continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean, 
+                conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
+        sandesha2_sender_mgr_update(sender_mgr, env, sender_bean);
+        if(!continue_sending)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                    "[sandesha2] Do not continue sending the application message");
+            if(sender_bean)
+            {
+                sandesha2_sender_bean_free(sender_bean, env);
+            }
+            
+            if(app_msg_ctx)
+            {
+                axis2_msg_ctx_free(app_msg_ctx, env);
+            }
+
+            return NULL;
+        }
+
+        rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
+                internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+        AXIS2_SLEEP(1);
+    }
+
+    if(rms_sequence_bean)
+    {
+        rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean, 
+                    env));
+        sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+    }
+
+    relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id, 
+            SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
+    if(relates_to_bean)
+    {
+        sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
+        sandesha2_seq_property_bean_free(relates_to_bean, env);
+    }
+
+    rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+
+    rm_version = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
+    if(!rm_version)
+    {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                "[sandesha2] Unable to find RM spec version for the rms internal_sequence_id %s", 
+                internal_sequence_id);
+
+        return NULL;
+    }
+
+    rm_ns_val = sandesha2_spec_specific_consts_get_rm_ns_val(env, rm_version);
+
+    identifier = sandesha2_identifier_create(env, rm_ns_val);
+    sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+    sandesha2_seq_set_identifier(sequence, env, identifier);
+    sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, sequence);
+    sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+        
+    /* TODO add_ack_requested */
+
+    if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
+    {
+        sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr, 
+                seq_prop_mgr, sender_mgr);
+    }
+
     while(AXIS2_TRUE)
     {
-        axis2_char_t *key = NULL;
-        axis2_msg_ctx_t *app_msg_ctx = NULL;
+        /*axis2_char_t *key = NULL;
+        axis2_msg_ctx_t *app_msg_ctx = NULL;*/
 
-        AXIS2_SLEEP(retrans_interval);
         sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env, 
                 internal_sequence_id, msg_id);
         if(!sender_bean)
@@ -3550,7 +3749,7 @@
             break;
         }
 
-        key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+        /*key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
         app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx, 
                 AXIS2_TRUE);
 
@@ -3559,14 +3758,15 @@
             AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
                     "[sandesha2] msg_ctx is not present in the store yet.");
 
-            /*msg_ctx is still not stored so try again later.*/
+            // msg_ctx is still not stored so try again later.
             if(sender_bean)
             {
                 sandesha2_sender_bean_free(sender_bean, env);
             }
 
             break;
-        }
+        }*/
+
         status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
                 internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr, 
                 sender_mgr, app_msg_ctx);
@@ -3577,11 +3777,6 @@
                 "[sandesha2] Resend failed for  message id %s in sequence %s", msg_id, 
                 internal_sequence_id);
 
-            if(app_msg_ctx)
-            {
-                axis2_msg_ctx_free(app_msg_ctx, env);
-            }
-
             if(sender_bean)
             {
                 sandesha2_sender_bean_free(sender_bean, env); 
@@ -3589,15 +3784,12 @@
             break;
         }
 
-        if(app_msg_ctx)
-        {
-            axis2_msg_ctx_free(app_msg_ctx, env);
-        }
-
         if(sender_bean)
         {
             sandesha2_sender_bean_free(sender_bean, env); 
         }
+
+        AXIS2_SLEEP(retrans_interval);
     }
 
     /*if(internal_sequence_id)
@@ -3610,6 +3802,16 @@
         AXIS2_FREE(env->allocator, msg_id);
     }*/
 
+    if(app_msg_ctx)
+    {
+        axis2_msg_ctx_free(app_msg_ctx, env);
+    }
+
+    if(rms_sequence_id)
+    {
+        AXIS2_FREE(env->allocator, rms_sequence_id);
+    }
+
     if(storage_mgr)
     {
         sandesha2_storage_mgr_free(storage_mgr, env);

Modified: webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c?rev=703207&r1=703206&r2=703207&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c (original)
+++ webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c Thu Oct  9 09:57:56 2008
@@ -126,32 +126,36 @@
     axutil_property_t *property = NULL;
     axis2_transport_out_desc_t *out_desc = NULL;
     axis2_transport_out_desc_t *temp_out_desc = NULL;
-    axis2_char_t *key = NULL;
-    axis2_conf_ctx_t *conf_ctx = NULL;
+    /*axis2_conf_ctx_t *conf_ctx = NULL;
     axis2_conf_t *conf = NULL;
+    axis2_char_t *key = NULL;
     sandesha2_storage_mgr_t *storage_mgr = NULL;
-    axis2_char_t *dbname = NULL;
+    axis2_char_t *dbname = NULL;*/
     
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-         "[sandesha2]Entry:sandesha2_transport_sender_invoke");
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_transport_sender_invoke");
     AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
-    property = axis2_msg_ctx_get_property(msg_ctx, env, 
-        SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC);
-    if(NULL == property || NULL == axutil_property_get_value(property, env))
+
+    property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC);
+    if(!property || !axutil_property_get_value(property, env))
+    {
         return AXIS2_FAILURE;
+    }
+
     out_desc = axutil_property_get_value(property, env);
     temp_out_desc = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
     if(temp_out_desc)
+    {
         axis2_transport_out_desc_free(temp_out_desc, env);
+    }
+
     axis2_msg_ctx_set_transport_out_desc(msg_ctx, env, out_desc);
     
-    property = axis2_msg_ctx_get_property(msg_ctx, env, 
-        SANDESHA2_MESSAGE_STORE_KEY);
+    /*property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_MESSAGE_STORE_KEY);
                         
-    if(NULL == property || NULL == axutil_property_get_value(property, env))
+    if(!property || !axutil_property_get_value(property, env))
     {
         AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-            "[sandesha2]SANDESHA2_MESSAGE_STORE_KEY property is NULL");
+                "[sandesha2] SANDESHA2_MESSAGE_STORE_KEY property is NULL");
         return AXIS2_FAILURE;
     }
     
@@ -162,13 +166,14 @@
     storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
     
     property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
-    axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_QUALIFIED_FOR_SENDING,
-        property);
+    axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_QUALIFIED_FOR_SENDING, property);
     sandesha2_storage_mgr_update_msg_ctx(storage_mgr, env, key, msg_ctx);
     if(storage_mgr)
+    {
         sandesha2_storage_mgr_free(storage_mgr, env);
-    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
-         "[sandesha2]Exit:sandesha2_transport_sender_invoke");
+    }*/
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_transport_sender_invoke");
     return AXIS2_SUCCESS;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org