You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/10/09 18:57:56 UTC
svn commit: r703207 - in /webservices/sandesha/trunk/c/src:
msgprocessors/app_msg_processor.c transport/sandesha2_transport_sender.c
Author: damitha
Date: Thu Oct 9 09:57:56 2008
New Revision: 703207
URL: http://svn.apache.org/viewvc?rev=703207&view=rev
Log:
Now Sandesha2/C store the application message in the database before sending it when it is dual channel.
Modified:
webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c
Modified: webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c?rev=703207&r1=703206&r2=703207&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c Thu Oct 9 09:57:56 2008
@@ -89,6 +89,7 @@
int retrans_interval;
void *bean;
void *msg_ctx;
+ sandesha2_seq_t *sequence;
};
static void AXIS2_CALL
@@ -100,12 +101,12 @@
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr);
-/*static void * AXIS2_THREAD_FUNC
+static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_create_seq_msg_worker_function(
axutil_thread_t *thd,
- void *data);*/
+ void *data);
-/*static axis2_status_t
+static axis2_status_t
sandesha2_app_msg_processor_start_create_seq_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
@@ -114,7 +115,7 @@
const axis2_bool_t is_server_side,
int retrans_interval,
sandesha2_sender_bean_t *create_sequence_sender_bean,
- axis2_msg_ctx_t *create_seq_msg_ctx);*/
+ axis2_msg_ctx_t *create_seq_msg_ctx);
static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_application_msg_worker_function(
@@ -129,7 +130,8 @@
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
- axis2_msg_ctx_t *app_msg_ctx);
+ axis2_msg_ctx_t *app_msg_ctx,
+ sandesha2_seq_t *sequence);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_in_msg (
@@ -1598,6 +1600,8 @@
if(AXIS2_SUCCESS != status)
{
+ /* Pause the message contex so that it won't be sent at transport sender */
+ axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Could not send create sequence message");
@@ -2375,8 +2379,8 @@
* 3. reply_to_addr is anonymous
* go into the following loop.
*/
- /*if(!is_svr_side && (listener_manager || !reply_to_addr || sandesha2_utils_is_anon_uri(env,
- reply_to_addr)))*/
+ if(!is_svr_side && (listener_manager || !reply_to_addr || sandesha2_utils_is_anon_uri(env,
+ reply_to_addr)))
{
if(axis2_engine_send(engine, env, create_seq_msg_ctx))
{
@@ -2459,17 +2463,25 @@
axis2_msg_ctx_free(create_seq_msg_ctx, env);
}
}
- /*else
+ else
{
- if(axis2_engine_send(engine, env, create_seq_msg_ctx))
- {
- if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
- {
- status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx,
- storage_mgr);
- }
- }
- else
+ /* This is actually a trick that get the msg_ctx traversed through all the out phases.
+ * Once all the phases are passed it will get hit into the false sandesha2 transport
+ * sender which just reset the original transport sender back.
+ */
+
+ axutil_property_t *property = NULL;
+ axis2_transport_out_desc_t *orig_transport_out = NULL;
+ axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
+
+ orig_transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
+ property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
+ axis2_msg_ctx_set_property(create_seq_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC,
+ property);
+ sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
+ axis2_msg_ctx_set_transport_out_desc(create_seq_msg_ctx, env, sandesha2_transport_out);
+
+ if(!axis2_engine_send(engine, env, create_seq_msg_ctx))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Engine Send failed");
}
@@ -2479,12 +2491,13 @@
axis2_engine_free(engine, env);
}
- rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
- SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
- // Dual channel
- sandesha2_app_msg_processor_start_create_seq_msg_resender(env, conf_ctx, internal_sequence_id,
- msg_id, is_svr_side, retrans_interval, create_sequence_sender_bean, create_seq_msg_ctx);
- }*/
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+ /* Dual channel */
+ status = sandesha2_app_msg_processor_start_create_seq_msg_resender(env, conf_ctx,
+ internal_sequence_id, msg_id, is_svr_side, retrans_interval,
+ create_sequence_sender_bean, create_seq_msg_ctx);
+ }
if(rm_version)
{
@@ -2497,7 +2510,7 @@
return status;
}
-/*static axis2_status_t
+static axis2_status_t
sandesha2_app_msg_processor_start_create_seq_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
@@ -2538,9 +2551,9 @@
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
return AXIS2_SUCCESS;
-}*/
+}
-/*static void * AXIS2_THREAD_FUNC
+static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_create_seq_msg_worker_function(
axutil_thread_t *thd,
void *data)
@@ -2558,7 +2571,7 @@
axis2_bool_t is_server_side = AXIS2_FALSE;
sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
axis2_char_t *msg_id = NULL;
- // sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ /* sandesha2_seq_property_bean_t *rms_sequence_bean = NULL; */
axis2_bool_t continue_sending = AXIS2_TRUE;
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
@@ -2589,8 +2602,6 @@
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
- AXIS2_SLEEP(retrans_interval);
-
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "dam_internal_sequence_id:%s", internal_sequence_id);
find_sender_bean = sandesha2_sender_bean_create(env);
@@ -2636,11 +2647,9 @@
break;
}
- AXIS2_SLEEP(retrans_interval);
-
if(transport_sender)
{
- // This is neccessary to avoid a double free
+ /* This is neccessary to avoid a double free */
axis2_msg_ctx_set_property(create_seq_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
{
@@ -2648,6 +2657,7 @@
}
}
+ AXIS2_SLEEP(retrans_interval);
sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
}
@@ -2687,7 +2697,7 @@
axutil_allocator_switch_to_local_pool(env->allocator);
return NULL;
-}*/
+}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_create_seq_response(
@@ -2794,7 +2804,7 @@
sandesha2_seq_property_bean_t *to_bean = NULL;
sandesha2_seq_property_bean_t *reply_to_bean = NULL;
sandesha2_seq_property_bean_t *from_acks_to_bean = NULL;
- sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ /*sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;*/
axis2_endpoint_ref_t *to_epr = NULL;
axis2_endpoint_ref_t *reply_to_epr = NULL;
axis2_char_t *from_acks_to_addr = NULL;
@@ -2807,12 +2817,12 @@
axis2_char_t *rm_ns_val = NULL;
sandesha2_msg_number_t *msg_number = NULL;
axis2_msg_ctx_t *req_msg = NULL;
- axis2_char_t *rms_sequence_id = NULL;
+ /*axis2_char_t *rms_sequence_id = NULL;*/
sandesha2_sender_bean_t *app_msg_sender_bean = NULL;
long millisecs = 0;
/*axutil_property_t *property = NULL;*/
axis2_engine_t *engine = NULL;
- sandesha2_identifier_t *identifier = NULL;
+ /*sandesha2_identifier_t *identifier = NULL;*/
axis2_char_t *msg_id = NULL;
axis2_bool_t last_msg = AXIS2_FALSE;
axis2_op_ctx_t *temp_op_ctx = NULL;
@@ -2828,7 +2838,7 @@
axis2_conf_t *conf = NULL;
const axis2_char_t *mep = NULL;
axis2_relates_to_t *relates_to = NULL;
- sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+ /*sandesha2_seq_property_bean_t *relates_to_bean = NULL;*/
axis2_svc_t *svc = NULL;
sandesha2_property_bean_t *property_bean = NULL;
@@ -2877,7 +2887,7 @@
reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
- rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ /*rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
while(!rms_sequence_bean)
@@ -2892,7 +2902,7 @@
rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
env));
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
- }
+ }*/
if (to_bean)
{
@@ -2960,10 +2970,10 @@
AXIS2_FREE(env->allocator, reply_to_addr);
}
- if(rms_sequence_id)
+ /*if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
- }
+ }*/
return AXIS2_FAILURE;
}
@@ -3003,10 +3013,10 @@
AXIS2_FREE(env->allocator, reply_to_addr);
}
- if(rms_sequence_id)
+ /*if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
- }
+ }*/
return AXIS2_FAILURE;
}
@@ -3047,19 +3057,19 @@
}
}
- if(!rms_sequence_id)
+ /*if(!rms_sequence_id)
{
- rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); /* Why should we do this?:damitha */
- }
+ rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); // Why should we do this?:damitha
+ }*/
- identifier = sandesha2_identifier_create(env, rm_ns_val);
+ /*identifier = sandesha2_identifier_create(env, rm_ns_val);
sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
sandesha2_seq_set_identifier(seq, env, identifier);
- sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
+ sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);*/
/* TODO add_ack_requested */
- sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+ /*sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);*/
op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
if(op_ctx)
@@ -3110,17 +3120,16 @@
sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
+ sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
- if(!rms_sequence_id)
+ /*if(!rms_sequence_id)
{
sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
}
else
{
sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
- /*property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
- axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);*/
- }
+ }*/
sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
@@ -3136,6 +3145,36 @@
if(is_svr_side && sandesha2_utils_is_anon_uri(env, from_acks_to_addr) && (!to_addr ||
sandesha2_utils_is_anon_uri(env, to_addr)))
{
+ sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ axis2_char_t *rms_sequence_id = NULL;
+ sandesha2_identifier_t *identifier = NULL;
+
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+ while(!rms_sequence_bean)
+ {
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+ AXIS2_SLEEP(1);
+ }
+
+ if(rms_sequence_bean)
+ {
+ rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
+ env));
+ sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+ }
+
+ identifier = sandesha2_identifier_create(env, rm_ns_val);
+ sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+ sandesha2_seq_set_identifier(seq, env, identifier);
+ sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
+
+ /* TODO add_ack_requested */
+
+ sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+
sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
if(req_rm_msg_ctx)
{
@@ -3222,10 +3261,10 @@
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
- if(rms_sequence_id)
+ /*if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
- }
+ }*/
return AXIS2_FAILURE;
}
@@ -3233,13 +3272,8 @@
axis2_msg_ctx_set_current_handler_index(app_msg_ctx, env,
axis2_msg_ctx_get_current_handler_index(app_msg_ctx, env) + 1);
- if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
- {
- sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr,
- seq_prop_mgr, sender_mgr);
- }
- if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
+ /*if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
{
if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
{
@@ -3253,26 +3287,83 @@
sandesha2_seq_property_bean_free(replay_bean, env);
}
}
- }
+ }*/
conf = axis2_conf_ctx_get_conf(conf_ctx, env);
retrans_interval = sandesha2_property_bean_get_retrans_interval(property_bean, env);
- relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
+ /*relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
if(relates_to_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
sandesha2_seq_property_bean_free(relates_to_bean, env);
- }
+ }*/
- if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
+ if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_anon_uri(env, reply_to_addr)))
{
/* Client side and oneway */
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
+ sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ axis2_char_t *rms_sequence_id = NULL;
+ sandesha2_identifier_t *identifier = NULL;
+ sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+ while(!rms_sequence_bean)
+ {
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+ AXIS2_SLEEP(1);
+ }
+
+ if(rms_sequence_bean)
+ {
+ rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
+ env));
+ sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+ }
+
+ relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
+ SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
+ if(relates_to_bean)
+ {
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
+ sandesha2_seq_property_bean_free(relates_to_bean, env);
+ }
+
+ if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
+ {
+ sandesha2_seq_property_bean_t *replay_bean = NULL;
+
+ replay_bean = sandesha2_seq_property_bean_create_with_data(env, rms_sequence_id,
+ SANDESHA2_SEQ_PROP_1_0_REPLAY, NULL);
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, replay_bean);
+ if(replay_bean)
+ {
+ sandesha2_seq_property_bean_free(replay_bean, env);
+ }
+ }
+
+ if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
+ {
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr,
+ seq_prop_mgr, sender_mgr);
+ }
+
+ identifier = sandesha2_identifier_create(env, rm_ns_val);
+ sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+ sandesha2_seq_set_identifier(seq, env, identifier);
+ sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
+ /* TODO add_ack_requested */
+
+ sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+
engine = axis2_engine_create(env, conf_ctx);
if(axis2_engine_resume_send(engine, env, app_msg_ctx))
{
@@ -3393,16 +3484,32 @@
}
else /* Not client side or twoway client side*/
{
+ /* This is actually a trick that get the msg_ctx traversed through all the out phases.
+ * Once all the phases are passed it will get hit into the false sandesha2 transport
+ * sender which just reset the original transport sender back.
+ */
+
+ axutil_property_t *property = NULL;
+ axis2_transport_out_desc_t *orig_transport_out = NULL;
+ axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
+
+ orig_transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
+ property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
+ axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC,
+ property);
+ sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
+ axis2_msg_ctx_set_transport_out_desc(app_msg_ctx, env, sandesha2_transport_out);
+
axis2_msg_ctx_increment_ref(app_msg_ctx, env);
engine = axis2_engine_create(env, conf_ctx);
- if(axis2_engine_resume_send(engine, env, app_msg_ctx))
- {
+ if(!axis2_engine_resume_send(engine, env, app_msg_ctx))
+ /*{
if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
status = sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
}
}
- else
+ else*/
{
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] Engine resume send failed");
}
@@ -3417,8 +3524,8 @@
/* If not (single channel) spawn a thread and see whether acknowledgment has arrived through the
* sandesha2_sender_mgr_get_application_msg_to_send() function. If it has arrived exit from
* the thread.*/
- sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx, internal_sequence_id,
- msg_id, is_svr_side, retrans_interval, app_msg_ctx);
+ status = sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx,
+ internal_sequence_id, msg_id, is_svr_side, retrans_interval, app_msg_ctx, seq);
}
if(rm_version)
@@ -3435,10 +3542,10 @@
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
- if(rms_sequence_id)
+ /*if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
- }
+ }*/
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2] Exit:sandesha2_app_msg_processor_send_app_msg");
@@ -3453,7 +3560,8 @@
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
- axis2_msg_ctx_t *app_msg_ctx)
+ axis2_msg_ctx_t *app_msg_ctx,
+ sandesha2_seq_t *sequence)
{
axutil_thread_t *worker_thread = NULL;
sandesha2_app_msg_processor_args_t *args = NULL;
@@ -3469,6 +3577,7 @@
args->retrans_interval = retrans_interval;
args->is_server_side = is_server_side;
args->msg_ctx = app_msg_ctx;
+ args->sequence = sequence;
worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
@@ -3505,6 +3614,17 @@
sandesha2_sender_bean_t *sender_bean = NULL;
axis2_char_t *msg_id = NULL;
axis2_status_t status = AXIS2_FAILURE;
+ axis2_svc_t *svc = NULL;
+ axis2_char_t *key = NULL;
+ axis2_msg_ctx_t *app_msg_ctx = NULL;
+ sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
+ axis2_char_t *rms_sequence_id = NULL;
+ sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
+ sandesha2_identifier_t *identifier = NULL;
+ sandesha2_seq_property_bean_t *relates_to_bean = NULL;
+ axis2_char_t *rm_version = NULL;
+ axis2_char_t *rm_ns_val = NULL;
+ sandesha2_seq_t *sequence = NULL;
args = (sandesha2_app_msg_processor_args_t*) data;
env = args->env;
@@ -3513,6 +3633,7 @@
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
conf_ctx = args->conf_ctx;
+ sequence = args->sequence;
msg_id = axutil_strdup(env, args->msg_id);
internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
is_server_side = args->is_server_side;
@@ -3523,7 +3644,6 @@
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
- AXIS2_SLEEP(retrans_interval);
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
@@ -3534,12 +3654,91 @@
return NULL;
}
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+
+ key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+ app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx,
+ AXIS2_TRUE);
+ svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
+
+ while(!rms_sequence_bean)
+ {
+ axis2_bool_t continue_sending = AXIS2_TRUE;
+
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean,
+ conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
+ sandesha2_sender_mgr_update(sender_mgr, env, sender_bean);
+ if(!continue_sending)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Do not continue sending the application message");
+ if(sender_bean)
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
+
+ if(app_msg_ctx)
+ {
+ axis2_msg_ctx_free(app_msg_ctx, env);
+ }
+
+ return NULL;
+ }
+
+ rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
+ AXIS2_SLEEP(1);
+ }
+
+ if(rms_sequence_bean)
+ {
+ rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
+ env));
+ sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+ }
+
+ relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
+ SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
+ if(relates_to_bean)
+ {
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
+ sandesha2_seq_property_bean_free(relates_to_bean, env);
+ }
+
+ rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+
+ rm_version = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
+ if(!rm_version)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Unable to find RM spec version for the rms internal_sequence_id %s",
+ internal_sequence_id);
+
+ return NULL;
+ }
+
+ rm_ns_val = sandesha2_spec_specific_consts_get_rm_ns_val(env, rm_version);
+
+ identifier = sandesha2_identifier_create(env, rm_ns_val);
+ sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
+ sandesha2_seq_set_identifier(sequence, env, identifier);
+ sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, sequence);
+ sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
+
+ /* TODO add_ack_requested */
+
+ if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
+ {
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr,
+ seq_prop_mgr, sender_mgr);
+ }
+
while(AXIS2_TRUE)
{
- axis2_char_t *key = NULL;
- axis2_msg_ctx_t *app_msg_ctx = NULL;
+ /*axis2_char_t *key = NULL;
+ axis2_msg_ctx_t *app_msg_ctx = NULL;*/
- AXIS2_SLEEP(retrans_interval);
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
@@ -3550,7 +3749,7 @@
break;
}
- key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+ /*key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx,
AXIS2_TRUE);
@@ -3559,14 +3758,15 @@
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] msg_ctx is not present in the store yet.");
- /*msg_ctx is still not stored so try again later.*/
+ // msg_ctx is still not stored so try again later.
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
break;
- }
+ }*/
+
status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr,
sender_mgr, app_msg_ctx);
@@ -3577,11 +3777,6 @@
"[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
internal_sequence_id);
- if(app_msg_ctx)
- {
- axis2_msg_ctx_free(app_msg_ctx, env);
- }
-
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
@@ -3589,15 +3784,12 @@
break;
}
- if(app_msg_ctx)
- {
- axis2_msg_ctx_free(app_msg_ctx, env);
- }
-
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
+
+ AXIS2_SLEEP(retrans_interval);
}
/*if(internal_sequence_id)
@@ -3610,6 +3802,16 @@
AXIS2_FREE(env->allocator, msg_id);
}*/
+ if(app_msg_ctx)
+ {
+ axis2_msg_ctx_free(app_msg_ctx, env);
+ }
+
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
+
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
Modified: webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c?rev=703207&r1=703206&r2=703207&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c (original)
+++ webservices/sandesha/trunk/c/src/transport/sandesha2_transport_sender.c Thu Oct 9 09:57:56 2008
@@ -126,32 +126,36 @@
axutil_property_t *property = NULL;
axis2_transport_out_desc_t *out_desc = NULL;
axis2_transport_out_desc_t *temp_out_desc = NULL;
- axis2_char_t *key = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
+ /*axis2_conf_ctx_t *conf_ctx = NULL;
axis2_conf_t *conf = NULL;
+ axis2_char_t *key = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
- axis2_char_t *dbname = NULL;
+ axis2_char_t *dbname = NULL;*/
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2]Entry:sandesha2_transport_sender_invoke");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_transport_sender_invoke");
AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
- property = axis2_msg_ctx_get_property(msg_ctx, env,
- SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC);
- if(NULL == property || NULL == axutil_property_get_value(property, env))
+
+ property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC);
+ if(!property || !axutil_property_get_value(property, env))
+ {
return AXIS2_FAILURE;
+ }
+
out_desc = axutil_property_get_value(property, env);
temp_out_desc = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
if(temp_out_desc)
+ {
axis2_transport_out_desc_free(temp_out_desc, env);
+ }
+
axis2_msg_ctx_set_transport_out_desc(msg_ctx, env, out_desc);
- property = axis2_msg_ctx_get_property(msg_ctx, env,
- SANDESHA2_MESSAGE_STORE_KEY);
+ /*property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_MESSAGE_STORE_KEY);
- if(NULL == property || NULL == axutil_property_get_value(property, env))
+ if(!property || !axutil_property_get_value(property, env))
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2]SANDESHA2_MESSAGE_STORE_KEY property is NULL");
+ "[sandesha2] SANDESHA2_MESSAGE_STORE_KEY property is NULL");
return AXIS2_FAILURE;
}
@@ -162,13 +166,14 @@
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
- axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_QUALIFIED_FOR_SENDING,
- property);
+ axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_QUALIFIED_FOR_SENDING, property);
sandesha2_storage_mgr_update_msg_ctx(storage_mgr, env, key, msg_ctx);
if(storage_mgr)
+ {
sandesha2_storage_mgr_free(storage_mgr, env);
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2]Exit:sandesha2_transport_sender_invoke");
+ }*/
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_transport_sender_invoke");
return AXIS2_SUCCESS;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org