You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2007/12/05 07:07:55 UTC
svn commit: r601217 - in /webservices/sandesha/trunk/c:
include/sandesha2_sender_worker.h
src/storage/sqlite/permanent_storage_mgr.c src/util/msg_creator.c
src/workers/sender.c src/workers/sender_worker.c
Author: damitha
Date: Tue Dec 4 22:07:54 2007
New Revision: 601217
URL: http://svn.apache.org/viewvc?rev=601217&view=rev
Log:
removed the sender worker thread. Instead sender use a function to send messages.
Modified:
webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
webservices/sandesha/trunk/c/src/util/msg_creator.c
webservices/sandesha/trunk/c/src/workers/sender.c
webservices/sandesha/trunk/c/src/workers/sender_worker.c
Modified: webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h (original)
+++ webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h Tue Dec 4 22:07:54 2007
@@ -95,6 +95,13 @@
const axutil_env_t *env,
const axis2_bool_t persistent_msg_ctx);
+axis2_status_t
+sandesha2_sender_worker_send(
+ axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *msg_id,
+ axis2_bool_t persistent_msg_ctx);
+
void sandesha2_sender_worker_set_transport_out(
sandesha2_sender_worker_t *sender_worker,
const axutil_env_t *env,
Modified: webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c Tue Dec 4 22:07:54 2007
@@ -816,9 +816,9 @@
sandesha2_msg_store_bean_t *msg_store_bean = NULL;
storage_mgr_impl = SANDESHA2_INTF_TO_IMPL(storage_mgr);
- /*if(!persistent)
+ if(!persistent)
msg_ctx = (axis2_msg_ctx_t *) axutil_hash_get(
- storage_mgr_impl->msg_ctx_map, key, AXIS2_HASH_KEY_STRING);*/
+ storage_mgr_impl->msg_ctx_map, key, AXIS2_HASH_KEY_STRING);
if(msg_ctx)
return msg_ctx;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2]retrieved from database");
Modified: webservices/sandesha/trunk/c/src/util/msg_creator.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/msg_creator.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/util/msg_creator.c (original)
+++ webservices/sandesha/trunk/c/src/util/msg_creator.c Tue Dec 4 22:07:54 2007
@@ -803,6 +803,11 @@
{
axutil_property_set_own_value(new_prop, env, AXIS2_FALSE);
}
+ if(0 == axutil_strcmp(AXIS2_HTTP_CLIENT, key))
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "dam_axis2_http_client");
+ axutil_property_set_own_value(new_prop, env, AXIS2_FALSE);
+ }
axutil_hash_set(new_msg_props, key, AXIS2_HASH_KEY_STRING, new_prop);
}
}
Modified: webservices/sandesha/trunk/c/src/workers/sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender.c Tue Dec 4 22:07:54 2007
@@ -262,7 +262,6 @@
sandesha2_transaction_t *transaction = NULL;
sandesha2_sender_mgr_t *mgr = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
- sandesha2_sender_worker_t *sender_worker = NULL;
axis2_char_t *msg_id = NULL;
transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
@@ -282,14 +281,9 @@
if(msg_id)
{
axis2_bool_t status = AXIS2_TRUE;
- /* Start a sender worker which will work on this message */
- sender_worker = sandesha2_sender_worker_create(env, sender->conf_ctx,
- msg_id);
- sandesha2_sender_worker_run(sender_worker, env,
+ status = sandesha2_sender_worker_send(env, sender->conf_ctx, msg_id,
sender->persistent_msg_ctx);
AXIS2_SLEEP(sleep_time * 2);
- status = sandesha2_sender_worker_get_status(
- sender_worker, env);
if(!status)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
Modified: webservices/sandesha/trunk/c/src/workers/sender_worker.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender_worker.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender_worker.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender_worker.c Tue Dec 4 22:07:54 2007
@@ -299,8 +299,12 @@
{
axutil_allocator_switch_to_global_pool(env->allocator);
if(sender_worker->persistent_msg_ctx)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Retrieving msg_ctx from database");
msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env,
key, sender_worker->conf_ctx, AXIS2_FALSE);
+ }
else
msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env,
key, sender_worker->conf_ctx, AXIS2_TRUE);
@@ -443,7 +447,6 @@
if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
{
successfully_sent = AXIS2_TRUE;
- sender_worker->counter++;
}else
{
successfully_sent = AXIS2_FALSE;
@@ -540,6 +543,288 @@
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_sender_worker_worker_func\n");
return NULL;
+
+}
+
+axis2_status_t
+sandesha2_sender_worker_send(
+ axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *msg_id,
+ axis2_bool_t persistent_msg_ctx)
+{
+ sandesha2_sender_worker_t *sender_worker = NULL;
+ sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_transaction_t *transaction = NULL;
+ sandesha2_sender_bean_t *sender_worker_bean = NULL;
+ sandesha2_sender_bean_t *bean1 = NULL;
+ sandesha2_sender_mgr_t *sender_mgr = NULL;
+ axis2_char_t *key = NULL;
+ axutil_property_t *property = NULL;
+ axis2_bool_t continue_sending = AXIS2_TRUE;
+ axis2_char_t *qualified_for_sending = NULL;
+ axis2_msg_ctx_t *msg_ctx = NULL;
+ sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
+ sandesha2_property_bean_t *prop_bean = NULL;
+ axutil_array_list_t *msgs_not_to_send = NULL;
+ int msg_type = -1;
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
+ axis2_bool_t successfully_sent = AXIS2_FALSE;
+ axis2_status_t status = AXIS2_SUCCESS;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_sender_worker_send");
+
+ storage_mgr = sandesha2_utils_get_storage_mgr(env,
+ conf_ctx, axis2_conf_ctx_get_conf(conf_ctx, env));
+ transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
+ sender_mgr = sandesha2_storage_mgr_get_retrans_mgr(storage_mgr, env);
+ sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+ if(!sender_worker_bean)
+ {
+ AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI,
+ "[sandesha2] sender_worker_bean is NULL");
+ sandesha2_transaction_rollback(transaction, env);
+ return AXIS2_FAILURE;
+ }
+
+ key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_worker_bean, env);
+ if(!msg_ctx)
+ {
+ axutil_allocator_switch_to_global_pool(env->allocator);
+ if(persistent_msg_ctx)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Retrieving msg_ctx from database");
+ msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env,
+ key, conf_ctx, AXIS2_FALSE);
+ }
+ else
+ msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env,
+ key, conf_ctx, AXIS2_TRUE);
+ axutil_allocator_switch_to_local_pool(env->allocator);
+ }
+ if(!msg_ctx)
+ {
+ sandesha2_transaction_rollback(transaction, env);
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is "\
+ "not present in the store");
+ return AXIS2_FAILURE;
+ }
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION);
+ if(property)
+ axutil_property_set_value(property, env, AXIS2_VALUE_TRUE);
+ else
+ {
+ property = axutil_property_create_with_args(env, 0, 0, 0,
+ AXIS2_VALUE_TRUE);
+ axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
+ property);
+ }
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
+ sender_worker_bean, conf_ctx, storage_mgr);
+ sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
+ if(!continue_sending)
+ {
+ status = AXIS2_FAILURE;
+ /* We commit here since we have cleaned the
+ * sending side data and that need to commited */
+ sandesha2_transaction_commit(transaction, env);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Do not continue sending the message");
+ return status;
+ }
+
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SANDESHA2_QUALIFIED_FOR_SENDING);
+ if(property)
+ qualified_for_sending = axutil_property_get_value(property, env);
+ if(qualified_for_sending && 0 != axutil_strcmp(
+ qualified_for_sending, AXIS2_VALUE_TRUE))
+ {
+ sandesha2_transaction_rollback(transaction, env);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Message is not qualified for sending");
+ return AXIS2_FAILURE;
+ }
+ rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
+
+ prop_bean = sandesha2_utils_get_property_bean(env,
+ axis2_conf_ctx_get_conf(conf_ctx, env));
+ if(prop_bean)
+ msgs_not_to_send = sandesha2_property_bean_get_msg_types_to_drop(
+ prop_bean, env);
+ if(msgs_not_to_send)
+ {
+ int j = 0;
+ axis2_bool_t continue_sending = AXIS2_FALSE;
+
+ for(j = 0; j < axutil_array_list_size(msgs_not_to_send, env); j++)
+ {
+ axis2_char_t *value = NULL;
+ int int_val = -1;
+ int msg_type = -1;
+
+ value = axutil_array_list_get(msgs_not_to_send, env, j);
+ int_val = atoi(value);
+ msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+ if(msg_type == int_val)
+ continue_sending = AXIS2_TRUE;
+ }
+ if(continue_sending)
+ {
+ sandesha2_transaction_rollback(transaction, env);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Continue "\
+ "Sending is true. So returning from Sender Worker");
+ return AXIS2_SUCCESS;
+ }
+ }
+ /*
+ * This method is not implemented yet
+ * update_msg(sender_worker, env, msg_xtx);
+ */
+ msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+ if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION)
+ {
+ sandesha2_seq_t *seq = NULL;
+ axis2_char_t *seq_id = NULL;
+ sandesha2_identifier_t *identifier = NULL;
+
+ seq = (sandesha2_seq_t*)
+ sandesha2_msg_ctx_get_msg_part(rm_msg_ctx,
+ env, SANDESHA2_MSG_PART_SEQ);
+ identifier = sandesha2_seq_get_identifier(seq, env);
+ seq_id = sandesha2_identifier_get_identifier(identifier, env);
+ }
+ if(sandesha2_sender_worker_is_piggybackable_msg_type(sender_worker, env,
+ msg_type) && AXIS2_FALSE ==
+ sandesha2_sender_worker_is_ack_already_piggybacked(sender_worker, env,
+ rm_msg_ctx))
+ {
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx,
+ storage_mgr);
+ }
+
+ if(!transport_out)
+ transport_out = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
+ transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+ if(transport_sender)
+ {
+ sandesha2_transaction_commit(transaction, env);
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION);
+ if(property)
+ axutil_property_set_value(property, env, AXIS2_VALUE_FALSE);
+ else
+ {
+ property = axutil_property_create_with_args(env, 0, 0, 0,
+ AXIS2_VALUE_FALSE);
+ axis2_msg_ctx_set_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property);
+ }
+ /* This is neccessary to avoid a double free */
+ axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+ /* Consider building soap envelope */
+ if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
+ {
+ successfully_sent = AXIS2_TRUE;
+ }else
+ {
+ successfully_sent = AXIS2_FALSE;
+ }
+ }
+ transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION);
+ if(property)
+ axutil_property_set_value(property, env, AXIS2_VALUE_TRUE);
+ else
+ {
+ property = axutil_property_create_with_args(env, 0, 0, 0,
+ AXIS2_VALUE_TRUE);
+ axis2_msg_ctx_set_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property);
+ }
+ msg_id = sandesha2_sender_bean_get_msg_id((sandesha2_rm_bean_t *)
+ sender_worker_bean, env);
+ bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+ if(bean1)
+ {
+ axis2_bool_t resend = AXIS2_FALSE;
+
+ resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
+ if(resend)
+ {
+ sandesha2_sender_bean_set_sent_count(bean1, env,
+ sandesha2_sender_bean_get_sent_count(sender_worker_bean, env));
+ sandesha2_sender_bean_set_time_to_send(bean1, env,
+ sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
+ sandesha2_sender_mgr_update(sender_mgr, env, bean1);
+ }
+ else
+ {
+ axis2_char_t *msg_stored_key = NULL;
+
+ msg_id = sandesha2_sender_bean_get_msg_id((sandesha2_rm_bean_t *)
+ bean1, env);
+ sandesha2_sender_mgr_remove(sender_mgr, env, msg_id);
+ /* Removing the message from the storage */
+ msg_stored_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
+ bean1, env);
+ sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,
+ msg_stored_key);
+ }
+ }
+ if(successfully_sent)
+ {
+ if(AXIS2_FALSE == axis2_msg_ctx_get_server_side(msg_ctx, env))
+ sandesha2_sender_worker_check_for_sync_res(env, msg_ctx);
+ }
+ msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+ if(SANDESHA2_MSG_TYPE_TERMINATE_SEQ == msg_type)
+ {
+ sandesha2_terminate_seq_t *terminate_seq = NULL;
+ axis2_char_t *seq_id = NULL;
+ axis2_conf_ctx_t *conf_ctx = NULL;
+ axis2_char_t *internal_seq_id = NULL;
+
+ terminate_seq = (sandesha2_terminate_seq_t*)
+ sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env,
+ SANDESHA2_MSG_PART_TERMINATE_SEQ);
+ seq_id = sandesha2_identifier_get_identifier(
+ sandesha2_terminate_seq_get_identifier(terminate_seq,
+ env), env);
+ conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+ internal_seq_id = sandesha2_utils_get_seq_property(env, seq_id,
+ SANDESHA2_SEQ_PROP_INTERNAL_SEQ_ID, storage_mgr);
+ sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx,
+ internal_seq_id, axis2_msg_ctx_get_server_side(msg_ctx, env),
+ storage_mgr);
+ /* We have no more messages for this sequence. So continue send
+ * status is false*/
+ status = AXIS2_FAILURE;
+ }
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION);
+ if(property)
+ axutil_property_set_value(property, env, AXIS2_VALUE_FALSE);
+ else
+ {
+ property = axutil_property_create_with_args(env, 0, 0, 0,
+ AXIS2_VALUE_FALSE);
+ axis2_msg_ctx_set_property(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property);
+ }
+ /* TODO make transaction handling effective */
+ if(transaction)
+ {
+ sandesha2_transaction_commit(transaction, env);
+ }
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_sender_worker_send");
+ return status;
}
static axis2_bool_t AXIS2_CALL
@@ -586,7 +871,6 @@
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Start:sandesha2_sender_worker_check_for_sync_res");
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
/*property = axis2_msg_ctx_get_property(msg_ctx, env, AXIS2_TRANSPORT_IN);
if(!property)
@@ -646,8 +930,12 @@
res_envelope = axis2_msg_ctx_get_response_soap_envelope(msg_ctx, env);
if(!res_envelope)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Response envelope not found");
res_envelope = axis2_http_transport_utils_create_soap_msg(env, msg_ctx,
soap_ns_uri);
+ }
property = axis2_msg_ctx_get_property(msg_ctx, env,
SANDESHA2_WITHIN_TRANSACTION);
@@ -659,6 +947,8 @@
if(res_envelope)
{
axis2_engine_t *engine = NULL;
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Response envelope found");
axis2_msg_ctx_set_soap_envelope(res_msg_ctx, env, res_envelope);
engine = axis2_engine_create(env, axis2_msg_ctx_get_conf_ctx(msg_ctx,
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org