You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/06/15 11:11:38 UTC
svn commit: r667935 - in
/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008:
include/sandesha2_constants.h src/msgprocessors/ack_msg_processor.c
src/msgprocessors/app_msg_processor.c src/util/terminate_mgr.c
Author: damitha
Date: Sun Jun 15 02:11:37 2008
New Revision: 667935
URL: http://svn.apache.org/viewvc?rev=667935&view=rev
Log:
Fixed terminate resending for replay model.
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_constants.h Sun Jun 15 02:11:37 2008
@@ -290,6 +290,8 @@
#define SANDESHA2_SEQ_PROP_LAST_IN_MESSAGE_ID "LastInMessageId"
#define SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_ID "HighestInMsgId"
+
+ #define SANDESHA2_SEQ_PROP_1_0_REPLAY "Replay1_0"
/**
* SOAP Versions
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c Sun Jun 15 02:11:37 2008
@@ -337,6 +337,7 @@
size = axutil_array_list_size(ack_range_list, env);
}
+ /* Remove application sender beans from database that are acked */
for(i = 0; i < size; i++)
{
sandesha2_ack_range_t *ack_range = NULL;
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Sun Jun 15 02:11:37 2008
@@ -2172,7 +2172,7 @@
axis2_char_t *rm_ns_val = NULL;
sandesha2_msg_number_t *msg_number = NULL;
axis2_msg_ctx_t *req_msg = NULL;
- axis2_char_t *str_identifier = NULL;
+ axis2_char_t *rms_sequence_id = NULL;
sandesha2_sender_bean_t *app_msg_bean = NULL;
long millisecs = 0;
axutil_property_t *property = NULL;
@@ -2195,6 +2195,7 @@
axutil_param_t *sleep_time_param = NULL;
int sleep_time = 0;
axis2_conf_t *conf = NULL;
+ const axis2_char_t *mep = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2] Entry:sandesha2_app_msg_processor_send_app_msg");
@@ -2223,6 +2224,13 @@
AXIS2_SLEEP(1);
}
+ if(rms_sequence_bean)
+ {
+ rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
+ env));
+ sandesha2_seq_property_bean_free(rms_sequence_bean, env);
+ }
+
if (to_bean)
{
to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(to_bean, env));
@@ -2287,6 +2295,10 @@
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
return AXIS2_FAILURE;
}
@@ -2325,6 +2337,10 @@
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
return AXIS2_FAILURE;
}
@@ -2365,17 +2381,13 @@
}
}
- if(!rms_sequence_bean || !sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
+ if(!rms_sequence_id)
{
- str_identifier = SANDESHA2_TEMP_SEQ_ID;
- }
- else
- {
- str_identifier = sandesha2_seq_property_bean_get_value(rms_sequence_bean, env);
+ rms_sequence_id = axutil_strdup(env, SANDESHA2_TEMP_SEQ_ID); /* Why should we do this?:damitha */
}
identifier = sandesha2_identifier_create(env, rm_ns_val);
- sandesha2_identifier_set_identifier(identifier, env, str_identifier);
+ sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
sandesha2_seq_set_identifier(seq, env, identifier);
sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, seq);
@@ -2457,6 +2469,10 @@
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
return status;
}
@@ -2474,7 +2490,7 @@
sandesha2_sender_bean_set_msg_no(app_msg_bean, env, msg_num);
sandesha2_sender_bean_set_msg_type(app_msg_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
- if(!rms_sequence_bean || !sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
+ if(!rms_sequence_id)
{
sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_FALSE);
}
@@ -2485,7 +2501,15 @@
axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
}
+
temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
+ if(temp_op_ctx)
+ {
+ axis2_op_t *op = NULL;
+
+ op = axis2_op_ctx_get_op(temp_op_ctx, env);
+ mep = axis2_op_get_msg_exchange_pattern(op, env);
+ }
/**
* When we store application message context as below it should be noted
@@ -2519,6 +2543,10 @@
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
return AXIS2_FAILURE;
}
@@ -2574,6 +2602,19 @@
axis2_transport_sender_t *transport_sender = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
+ if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
+ {
+ sandesha2_seq_property_bean_t *replay_bean = NULL;
+
+ replay_bean = sandesha2_seq_property_bean_create_with_data(env, rms_sequence_id,
+ SANDESHA2_SEQ_PROP_1_0_REPLAY, NULL);
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, replay_bean);
+ if(replay_bean)
+ {
+ sandesha2_seq_property_bean_free(replay_bean, env);
+ }
+ }
+
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
@@ -2600,6 +2641,9 @@
{
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
+ sandesha2_sender_mgr_update(sender_mgr, env, app_msg_bean);
+
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
@@ -2660,6 +2704,10 @@
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
return status;
}
@@ -2678,6 +2726,10 @@
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
+ if(rms_sequence_id)
+ {
+ AXIS2_FREE(env->allocator, rms_sequence_id);
+ }
/* If not (single channel) spawn a thread and see whether acknowledgment has arrived through the
* sandesha2_sender_mgr_get_application_msg_to_send() function. If it has arrived exit from
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c?rev=667935&r1=667934&r2=667935&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/terminate_mgr.c Sun Jun 15 02:11:37 2008
@@ -76,22 +76,6 @@
const axutil_env_t *env,
axis2_char_t *name);
-static axis2_status_t
-sandesha2_terminate_mgr_resend(
- const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *msg_id,
- axis2_bool_t is_svr_side,
- sandesha2_storage_mgr_t *storage_mgr,
- sandesha2_seq_property_mgr_t *seq_prop_mgr,
- sandesha2_create_seq_mgr_t *create_seq_mgr,
- sandesha2_sender_mgr_t *sender_mgr);
-
-static axis2_bool_t AXIS2_CALL
-sandesha2_terminate_mgr_check_for_response_msg(
- const axutil_env_t *env,
- axis2_msg_ctx_t *msg_ctx);
-
static axis2_status_t AXIS2_CALL
sandesha2_terminate_mgr_process_terminate_msg_response(
const axutil_env_t *env,
@@ -696,10 +680,6 @@
*/
find_seq_prop_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_seq_id(find_seq_prop_bean, env, internal_sequence_id);
- if(internal_sequence_id)
- {
- AXIS2_FREE(env->allocator, internal_sequence_id);
- }
found_list = sandesha2_seq_property_mgr_find(seq_prop_mgr, env, find_seq_prop_bean);
if(found_list)
@@ -769,7 +749,7 @@
AXIS2_EXTERN axis2_status_t AXIS2_CALL
sandesha2_terminate_mgr_send_terminate_seq_msg(
const axutil_env_t *env,
- sandesha2_msg_ctx_t *rm_msg_ctx,
+ sandesha2_msg_ctx_t *ack_rm_msg_ctx,
axis2_char_t *rms_sequence_id,
axis2_char_t *internal_sequence_id,
sandesha2_storage_mgr_t *storage_mgr,
@@ -777,11 +757,11 @@
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_sender_mgr_t *sender_mgr)
{
- axis2_msg_ctx_t *msg_ctx = NULL;
+ axis2_msg_ctx_t *ack_msg_ctx = NULL;
axis2_msg_ctx_t *terminate_msg_ctx = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
sandesha2_seq_property_bean_t *terminated = NULL;
- sandesha2_msg_ctx_t *terminate_rm_msg = NULL;
+ sandesha2_msg_ctx_t *terminate_rm_msg_ctx = NULL;
axutil_property_t *property = NULL;
axis2_endpoint_ref_t *to_epr = NULL;
sandesha2_seq_property_bean_t *to_bean = NULL;
@@ -790,7 +770,7 @@
axis2_char_t *key = NULL;
sandesha2_sender_bean_t *terminate_bean = NULL;
sandesha2_seq_property_bean_t *terminate_added = NULL;
- axis2_msg_ctx_t *msg_ctx1 = NULL;
+ sandesha2_seq_property_bean_t *replay_bean = NULL;
axis2_engine_t *engine = NULL;
axis2_char_t *temp_action = NULL;
axutil_string_t *soap_action = NULL;
@@ -806,7 +786,7 @@
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_terminate_mgr_send_terminate_seq_msg");
- AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
+ AXIS2_PARAM_CHECK(env->error, ack_rm_msg_ctx, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, rms_sequence_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, internal_sequence_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FAILURE);
@@ -814,8 +794,8 @@
AXIS2_PARAM_CHECK(env->error, create_seq_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, sender_mgr, AXIS2_FAILURE);
- msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
- conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+ ack_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(ack_rm_msg_ctx, env);
+ conf_ctx = axis2_msg_ctx_get_conf_ctx(ack_msg_ctx, env);
terminated = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rms_sequence_id,
SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
@@ -832,17 +812,17 @@
return AXIS2_SUCCESS;
}
- terminate_rm_msg = sandesha2_msg_creator_create_terminate_seq_msg(env, rm_msg_ctx,
+ terminate_rm_msg_ctx = sandesha2_msg_creator_create_terminate_seq_msg(env, ack_rm_msg_ctx,
rms_sequence_id, internal_sequence_id, seq_prop_mgr);
- if(!terminate_rm_msg)
+ if(!terminate_rm_msg_ctx)
{
return AXIS2_FAILURE;
}
- sandesha2_msg_ctx_set_flow(terminate_rm_msg, env, AXIS2_OUT_FLOW);
+ sandesha2_msg_ctx_set_flow(terminate_rm_msg_ctx, env, AXIS2_OUT_FLOW);
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
- sandesha2_msg_ctx_set_property(terminate_rm_msg, env, SANDESHA2_APPLICATION_PROCESSING_DONE,
+ sandesha2_msg_ctx_set_property(terminate_rm_msg_ctx, env, SANDESHA2_APPLICATION_PROCESSING_DONE,
property);
to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
@@ -859,20 +839,25 @@
to_addr = axis2_endpoint_ref_get_address(to_epr, env);
}
- sandesha2_msg_ctx_set_to(terminate_rm_msg, env, to_epr);
+ sandesha2_msg_ctx_set_to(terminate_rm_msg_ctx, env, to_epr);
rm_ver = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
if(!rm_ver)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Cannot find the rm version for msg");
+ if(terminate_rm_msg_ctx)
+ {
+ sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
+ }
+
return AXIS2_FAILURE;
}
- sandesha2_msg_ctx_set_wsa_action(terminate_rm_msg, env,
+ sandesha2_msg_ctx_set_wsa_action(terminate_rm_msg_ctx, env,
sandesha2_spec_specific_consts_get_terminate_seq_action(env, rm_ver));
temp_action = sandesha2_spec_specific_consts_get_terminate_seq_soap_action(env, rm_ver);
soap_action = axutil_string_create(env, temp_action);
- sandesha2_msg_ctx_set_soap_action(terminate_rm_msg, env, soap_action);
+ sandesha2_msg_ctx_set_soap_action(terminate_rm_msg_ctx, env, soap_action);
transport_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_TRANSPORT_TO);
@@ -880,28 +865,25 @@
{
axis2_char_t *value = sandesha2_seq_property_bean_get_value(transport_to_bean, env);
property = axutil_property_create_with_args(env, 0, 0, 0, value);
- sandesha2_msg_ctx_set_property(terminate_rm_msg, env, AXIS2_TRANSPORT_URL, property);
+ sandesha2_msg_ctx_set_property(terminate_rm_msg_ctx, env, AXIS2_TRANSPORT_URL, property);
}
- sandesha2_msg_ctx_add_soap_envelope(terminate_rm_msg, env);
- /* If server side and single channel duplex mode send the terminate sequence
- * message.
+ sandesha2_msg_ctx_add_soap_envelope(terminate_rm_msg_ctx, env);
+
+ terminate_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg_ctx, env);
+
+ /* If server side and single channel duplex mode send the terminate sequence message.
*/
- if(axis2_msg_ctx_get_server_side(msg_ctx, env) && sandesha2_utils_is_rm_1_0_anonymous_acks_to(
- env, rm_ver, to_addr))
+ if(sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_ver, to_addr))
{
- axis2_msg_ctx_t *msg_ctx2 = NULL;
-
- msg_ctx2 = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
- is_svr_side = axis2_msg_ctx_get_server_side(msg_ctx2, env);
- axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(msg_ctx2, env), env, AXIS2_TRUE);
- axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
- axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(msg_ctx, env), env, AXIS2_TRUE);
+ axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(terminate_msg_ctx, env), env, AXIS2_TRUE);
+ axis2_msg_ctx_set_paused(ack_msg_ctx, env, AXIS2_TRUE);
+ axis2_op_ctx_set_response_written(axis2_msg_ctx_get_op_ctx(ack_msg_ctx, env), env, AXIS2_TRUE);
engine = axis2_engine_create(env, conf_ctx);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] axis2_engine_send");
- axis2_engine_send(engine, env, msg_ctx2);
+ axis2_engine_send(engine, env, terminate_msg_ctx);
if(engine)
{
axis2_engine_free(engine, env);
@@ -909,22 +891,20 @@
}
/* Clean sending side data */
- {
- sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id,
- is_svr_side, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
- terminate_added = sandesha2_seq_property_bean_create(env);
-
- sandesha2_seq_property_bean_set_name(terminate_added, env,
- SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
-
- sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
- sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
- sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
+ sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id,
+ is_svr_side, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+ terminate_added = sandesha2_seq_property_bean_create(env);
- if(terminate_added)
- {
- sandesha2_seq_property_bean_free(terminate_added, env);
- }
+ sandesha2_seq_property_bean_set_name(terminate_added, env,
+ SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
+
+ sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
+ sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
+
+ if(terminate_added)
+ {
+ sandesha2_seq_property_bean_free(terminate_added, env);
}
if(rm_ver)
@@ -932,6 +912,11 @@
AXIS2_FREE(env->allocator, rm_ver);
}
+ if(terminate_rm_msg_ctx)
+ {
+ sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
+ }
+
return AXIS2_SUCCESS;
}
@@ -943,14 +928,13 @@
key = axutil_uuid_gen(env);
terminate_bean = sandesha2_sender_bean_create(env);
sandesha2_sender_bean_set_msg_ctx_ref_key(terminate_bean, env, key);
- terminate_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, key, terminate_msg_ctx);
property_bean = sandesha2_utils_get_property_bean(env, axis2_conf_ctx_get_conf(conf_ctx, env));
terminate_delay = sandesha2_property_bean_get_terminate_delay(property_bean, env);
send_time = sandesha2_utils_get_current_time_in_millis(env) + terminate_delay;
sandesha2_sender_bean_set_time_to_send(terminate_bean, env, send_time);
- msg_id = sandesha2_msg_ctx_get_msg_id(terminate_rm_msg, env);
+ msg_id = sandesha2_msg_ctx_get_msg_id(terminate_rm_msg_ctx, env);
sandesha2_sender_bean_set_msg_id(terminate_bean, env, msg_id);
sandesha2_sender_bean_set_send(terminate_bean, env, AXIS2_TRUE);
@@ -962,304 +946,158 @@
sandesha2_sender_bean_set_resend(terminate_bean, env, AXIS2_FALSE);
sandesha2_sender_mgr_insert(sender_mgr, env, terminate_bean);
- if(terminate_bean)
- {
- sandesha2_sender_bean_free(terminate_bean, env);
- }
- terminate_added = sandesha2_seq_property_bean_create(env);
- sandesha2_seq_property_bean_set_name(terminate_added, env, SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
- sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
- sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
- sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
-
- if(terminate_added)
- {
- sandesha2_seq_property_bean_free(terminate_added, env);
- }
-
- msg_ctx1 = sandesha2_msg_ctx_get_msg_ctx(terminate_rm_msg, env);
-
property = axutil_property_create_with_args(env, 0, AXIS2_TRUE, 0, key);
- axis2_msg_ctx_set_property(msg_ctx1, env, SANDESHA2_MESSAGE_STORE_KEY, property);
+ axis2_msg_ctx_set_property(terminate_msg_ctx, env, SANDESHA2_MESSAGE_STORE_KEY, property);
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
- axis2_msg_ctx_set_property(msg_ctx1, env, SANDESHA2_SET_SEND_TO_TRUE, property);
+ axis2_msg_ctx_set_property(terminate_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
- reply_to_epr = axis2_msg_ctx_get_to(msg_ctx, env);
+ reply_to_epr = axis2_msg_ctx_get_to(ack_msg_ctx, env);
if(reply_to_epr)
{
- axis2_msg_ctx_set_reply_to(msg_ctx1, env, reply_to_epr);
+ axis2_msg_ctx_set_reply_to(terminate_msg_ctx, env, reply_to_epr);
}
- if(!sandesha2_util_is_ack_already_piggybacked(env, terminate_rm_msg))
+ if(!sandesha2_util_is_ack_already_piggybacked(env, terminate_rm_msg_ctx))
{
- sandesha2_ack_mgr_piggyback_acks_if_present(env, terminate_rm_msg, storage_mgr, seq_prop_mgr,
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, terminate_rm_msg_ctx, storage_mgr, seq_prop_mgr,
sender_mgr);
}
- is_svr_side = sandesha2_msg_ctx_get_server_side(rm_msg_ctx, env);
+ is_svr_side = sandesha2_msg_ctx_get_server_side(ack_rm_msg_ctx, env); /* Do we need this?:damitha */
engine = axis2_engine_create(env, conf_ctx);
- if(AXIS2_SUCCESS == axis2_engine_send(engine, env, msg_ctx1))
+ replay_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
+ rms_sequence_id, SANDESHA2_SEQ_PROP_1_0_REPLAY);
+ if(replay_bean)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] replay on");
+ }
+ if(AXIS2_SUCCESS == axis2_engine_send(engine, env, terminate_msg_ctx))
{
- if(sandesha2_terminate_mgr_check_for_response_msg(env, msg_ctx1))
+ if(replay_bean)
{
axiom_soap_envelope_t *res_envelope = NULL;
axis2_char_t *soap_ns_uri = NULL;
-
- soap_ns_uri = axis2_msg_ctx_get_is_soap_11(msg_ctx1, env) ?
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
+
+ sandesha2_sender_bean_set_resend(terminate_bean, env, AXIS2_TRUE);
+ soap_ns_uri = axis2_msg_ctx_get_is_soap_11(terminate_msg_ctx, env) ?
AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
- res_envelope = axis2_msg_ctx_get_response_soap_envelope(msg_ctx1, env);
+ res_envelope = axis2_msg_ctx_get_response_soap_envelope(terminate_msg_ctx, env);
if(!res_envelope)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Response envelope not found");
res_envelope = (axiom_soap_envelope_t *) axis2_http_transport_utils_create_soap_msg(env,
- msg_ctx1, soap_ns_uri);
+ terminate_msg_ctx, soap_ns_uri);
}
if(res_envelope)
{
- status = sandesha2_terminate_mgr_process_terminate_msg_response(env, msg_ctx1, storage_mgr);
+ status = sandesha2_terminate_mgr_process_terminate_msg_response(env, terminate_msg_ctx, storage_mgr);
if(AXIS2_SUCCESS != status)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Terminate message response process failed for sequence %s",
internal_sequence_id);
-
- if(engine)
- {
- axis2_engine_free(engine, env);
- }
- if(terminate_rm_msg)
- {
- sandesha2_msg_ctx_free(terminate_rm_msg, env);
- }
-
- return status;
}
}
+ transport_out = axis2_msg_ctx_get_transport_out_desc(terminate_msg_ctx, env);
+ if(transport_out)
+ {
+ transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+ }
while(!res_envelope)
{
+ axis2_bool_t continue_sending = AXIS2_FALSE;
long retrans_delay = -1;
- retrans_delay = sandesha2_property_bean_get_retrans_interval(property_bean, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "retrans_delay:%ld", retrans_delay);
- AXIS2_SLEEP(retrans_delay);
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, terminate_bean,
+ conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
- status = sandesha2_terminate_mgr_resend(env, conf_ctx, msg_id, is_svr_side,
- storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+ sandesha2_sender_mgr_update(sender_mgr, env, terminate_bean);
- if(AXIS2_SUCCESS != status)
+ if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
- internal_sequence_id);
+ "[sandesha2] Do not continue sending the terminate sequence message");
break;
}
- }
- }
- }
- if(engine)
- {
- axis2_engine_free(engine, env);
- }
-
- if(terminate_rm_msg)
- {
- sandesha2_msg_ctx_free(terminate_rm_msg, env);
- }
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_terminate_mgr_send_terminate_seq_msg");
+ retrans_delay = sandesha2_property_bean_get_retrans_interval(property_bean, env);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "retrans_delay:%ld", retrans_delay);
+ AXIS2_SLEEP(retrans_delay);
- return status;
-}
+ if(transport_sender)
+ {
+ /* This is neccessary to avoid a double free */
+ axis2_msg_ctx_set_property(terminate_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+ if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, terminate_msg_ctx))
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Transport sender invoke failed in sending terminate sequence message");
+ }
+ }
-static axis2_status_t
-sandesha2_terminate_mgr_resend(
- const axutil_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *msg_id,
- axis2_bool_t is_svr_side,
- sandesha2_storage_mgr_t *storage_mgr,
- sandesha2_seq_property_mgr_t *seq_prop_mgr,
- sandesha2_create_seq_mgr_t *create_seq_mgr,
- sandesha2_sender_mgr_t *sender_mgr)
-{
- sandesha2_sender_bean_t *sender_worker_bean = NULL;
- sandesha2_sender_bean_t *bean1 = NULL;
- axis2_char_t *key = NULL;
- axis2_bool_t continue_sending = AXIS2_TRUE;
- axis2_msg_ctx_t *msg_ctx = NULL;
- sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
- axis2_transport_out_desc_t *transport_out = NULL;
- axis2_transport_sender_t *transport_sender = NULL;
- axis2_bool_t successfully_sent = AXIS2_FALSE;
- axis2_status_t status = AXIS2_SUCCESS;
- axis2_bool_t resend = AXIS2_FALSE;
+ status = sandesha2_terminate_mgr_process_terminate_msg_response(env, terminate_msg_ctx, storage_mgr);
+ if(AXIS2_SUCCESS != status)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Terminate message response process failed for sequence %s",
+ internal_sequence_id);
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_terminate_mgr_resend");
-
- sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
- if(!sender_worker_bean)
- {
- AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
- return AXIS2_FAILURE;
- }
+ break;
+ }
- key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_worker_bean, env);
- if(is_svr_side)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Retrieving msg_ctx from database");
- msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx, AXIS2_TRUE);
- }
- else
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Retrieving msg_ctx from configuration context");
- msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx,
- AXIS2_FALSE);
- }
+ res_envelope = axis2_msg_ctx_get_response_soap_envelope(terminate_msg_ctx, env);
+ }
- if(!msg_ctx)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is not present in the store.");
- if(sender_worker_bean)
- {
- sandesha2_sender_bean_free(sender_worker_bean, env);
+ sandesha2_seq_property_bean_free(replay_bean, env);
}
-
- return AXIS2_FAILURE;
}
- continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean,
- conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
- sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
- if(!continue_sending)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Do not continue sending the message");
- if(sender_worker_bean)
- {
- sandesha2_sender_bean_free(sender_worker_bean, env);
- }
-
- return AXIS2_FAILURE;
- }
-
- rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
-
- if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
- {
- sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx, storage_mgr, seq_prop_mgr,
- sender_mgr);
- }
-
- transport_out = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
- if(transport_out)
- {
- transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
- }
- if(transport_sender)
+ if(terminate_bean)
{
- /* This is neccessary to avoid a double free */
- axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
- if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
- {
- successfully_sent = AXIS2_TRUE;
- }else
- {
- successfully_sent = AXIS2_FALSE;
- }
+ sandesha2_sender_bean_free(terminate_bean, env);
}
- msg_id = sandesha2_sender_bean_get_msg_id(sender_worker_bean, env);
- bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
- if(bean1)
- {
- resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
- if(resend)
- {
- sandesha2_sender_bean_set_sent_count(bean1, env,
- sandesha2_sender_bean_get_sent_count(sender_worker_bean, env));
- sandesha2_sender_bean_set_time_to_send(bean1, env,
- sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
- sandesha2_sender_mgr_update(sender_mgr, env, bean1);
- }
- }
+ sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx, internal_sequence_id, is_svr_side,
+ storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+ terminate_added = sandesha2_seq_property_bean_create(env);
- if(sender_worker_bean)
- {
- sandesha2_sender_bean_free(sender_worker_bean, env);
- }
+ sandesha2_seq_property_bean_set_name(terminate_added, env, SANDESHA2_SEQ_PROP_TERMINATE_ADDED);
- if(successfully_sent)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Message successfully sent");
- if(sandesha2_terminate_mgr_check_for_response_msg(env, msg_ctx))
- {
- status = sandesha2_terminate_mgr_process_terminate_msg_response(env, msg_ctx, storage_mgr);
- if(AXIS2_SUCCESS != status)
- {
- return status;
- }
- }
- }
+ sandesha2_seq_property_bean_set_seq_id(terminate_added, env, rms_sequence_id);
+ sandesha2_seq_property_bean_set_value(terminate_added, env, AXIS2_VALUE_TRUE);
+ sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, terminate_added);
- if(bean1)
+ if(terminate_added)
{
- sandesha2_sender_bean_free(bean1, env);
+ sandesha2_seq_property_bean_free(terminate_added, env);
}
- if(rm_msg_ctx)
+ if(engine)
{
- sandesha2_msg_ctx_free(rm_msg_ctx, env);
+ axis2_engine_free(engine, env);
}
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_terminate_mgr_resend");
-
- return status;
-}
-
-static axis2_bool_t AXIS2_CALL
-sandesha2_terminate_mgr_check_for_response_msg(
- const axutil_env_t *env,
- axis2_msg_ctx_t *msg_ctx)
-{
- axis2_bool_t svr_side = AXIS2_FALSE;
- axis2_char_t *soap_ns_uri = NULL;
-
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_terminate_mgr_check_for_response_msg");
-
- AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
-
- svr_side = axis2_msg_ctx_get_server_side(msg_ctx, env);
- soap_ns_uri = axis2_msg_ctx_get_is_soap_11(msg_ctx, env) ?
- AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
- AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
-
- if(svr_side)
+ if(terminate_rm_msg_ctx)
{
- /* We check and process the sync response only in the application client
- * side.
- */
- return AXIS2_FALSE;
+ sandesha2_msg_ctx_free(terminate_rm_msg_ctx, env);
}
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_terminate_mgr_check_for_response_msg");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Exit:sandesha2_terminate_mgr_send_terminate_seq_msg");
- return AXIS2_TRUE;
+ return status;
}
-
static axis2_status_t AXIS2_CALL
sandesha2_terminate_mgr_process_terminate_msg_response(
const axutil_env_t *env,
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org