You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2007/11/29 05:22:59 UTC
svn commit: r599261 - in /webservices/sandesha/trunk/c: config/
samples/rm_ping_1_0/ src/msgprocessors/ src/storage/sqlite/ src/util/
src/workers/
Author: damitha
Date: Wed Nov 28 20:22:54 2007
New Revision: 599261
URL: http://svn.apache.org/viewvc?rev=599261&view=rev
Log:
Did changes to the sandesha2c sender so that for each sequence a corresponding sequence sender thread is created.
When that sequence terminate sender thread also terminate. Solved some problems involving seq id and internal seqid.
Modified:
webservices/sandesha/trunk/c/config/axis2.xml
webservices/sandesha/trunk/c/samples/rm_ping_1_0/rm_ping_1_0.c
webservices/sandesha/trunk/c/src/msgprocessors/ack_msg_processor.c
webservices/sandesha/trunk/c/src/msgprocessors/ack_req_msg_processor.c
webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
webservices/sandesha/trunk/c/src/msgprocessors/create_seq_res_msg_processor.c
webservices/sandesha/trunk/c/src/storage/sqlite/permanent_sender_mgr.c
webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
webservices/sandesha/trunk/c/src/util/ack_mgr.c
webservices/sandesha/trunk/c/src/util/sandesha2_utils.c
webservices/sandesha/trunk/c/src/util/terminate_mgr.c
webservices/sandesha/trunk/c/src/workers/sender.c
Modified: webservices/sandesha/trunk/c/config/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/config/axis2.xml?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/config/axis2.xml (original)
+++ webservices/sandesha/trunk/c/config/axis2.xml Wed Nov 28 20:22:54 2007
@@ -25,7 +25,7 @@
<!-- Transport Ins -->
<!-- ================================================= -->
<transportReceiver name="http" class="axis2_http_receiver">
- <parameter name="port" locked="false">6060</parameter>
+ <parameter name="port" locked="false">6061</parameter>
</transportReceiver>
<!-- Uncomment this one with the appropriate papameters to enable the SMTP transport Receiver
@@ -39,7 +39,7 @@
<!--
<transportReceiver name="tcp" class="org.apache.axis2.transport.tcp.TCPServer">
- <parameter name="port" locked="false">6060</parameter>
+ <parameter name="port" locked="false">6061</parameter>
</transportReceiver>
-->
<!-- ================================================= -->
Modified: webservices/sandesha/trunk/c/samples/rm_ping_1_0/rm_ping_1_0.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/samples/rm_ping_1_0/rm_ping_1_0.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/samples/rm_ping_1_0/rm_ping_1_0.c (original)
+++ webservices/sandesha/trunk/c/samples/rm_ping_1_0/rm_ping_1_0.c Wed Nov 28 20:22:54 2007
@@ -52,8 +52,8 @@
axis2_char_t *seq_key = NULL;
/* Set up the environment */
- /*env = axutil_env_create_all("rm_ping.log", AXIS2_LOG_LEVEL_TRACE);*/
- env = axutil_env_create_all("rm_ping.log", AXIS2_LOG_LEVEL_DEBUG);
+ env = axutil_env_create_all("rm_ping.log", AXIS2_LOG_LEVEL_TRACE);
+ /*env = axutil_env_create_all("rm_ping.log", AXIS2_LOG_LEVEL_DEBUG);*/
/* Set end point reference of echo service */
address = "http://127.0.0.1:9090/axis2/services/RMSampleService";
@@ -136,18 +136,18 @@
}
/* Send request */
- payload = build_om_programatically(env, "ping1", seq_key);
+ /*payload = build_om_programatically(env, "ping1", seq_key);
status = axis2_svc_client_send_robust(svc_client, env, payload);
if(status)
printf("\nping client invoke SUCCESSFUL!\n");
- payload = NULL;
+ payload = NULL;*/
/*AXIS2_SLEEP(MAX_COUNT);*/
- payload = build_om_programatically(env, "ping2", seq_key);
+ /*payload = build_om_programatically(env, "ping2", seq_key);
status = axis2_svc_client_send_robust(svc_client, env, payload);
if(status)
printf("\nping client invoke SUCCESSFUL!\n");
- payload = NULL;
+ payload = NULL;*/
/*AXIS2_SLEEP(MAX_COUNT);*/
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
Modified: webservices/sandesha/trunk/c/src/msgprocessors/ack_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/ack_msg_processor.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/ack_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/ack_msg_processor.c Wed Nov 28 20:22:54 2007
@@ -336,6 +336,8 @@
ack_range_list, highest_out_msg_no);
if(completed)
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2]Sequence "\
+ "%s is completed. So adding terminate msg", out_seq_id);
sandesha2_terminate_mgr_add_terminate_seq_msg(env,
rm_msg_ctx, out_seq_id, int_seq_id, storage_mgr);
}
Modified: webservices/sandesha/trunk/c/src/msgprocessors/ack_req_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/ack_req_msg_processor.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/ack_req_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/ack_req_msg_processor.c Wed Nov 28 20:22:54 2007
@@ -159,6 +159,8 @@
axis2_char_t *wsa_version = NULL;
axis2_char_t *addr_ns_val = NULL;
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_ack_"\
+ "req_msg_processor_process_in_msg");
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
Modified: webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c Wed Nov 28 20:22:54 2007
@@ -926,11 +926,15 @@
send_create_seq = AXIS2_TRUE;
if(is_svr_side)
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Starting the server "\
+ "sequence with internal sequence id %s", internal_seq_id);
sandesha2_seq_mgr_setup_new_client_seq(env, msg_ctx, internal_seq_id,
spec_ver, storage_mgr, AXIS2_FALSE);
}
else
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Starting the client "\
+ "sequence with internal sequence id %s", internal_seq_id);
sandesha2_seq_mgr_setup_new_client_seq(env, msg_ctx, internal_seq_id,
spec_ver, storage_mgr, AXIS2_TRUE);
}
@@ -1241,7 +1245,7 @@
sandesha2_sender_bean_set_time_to_send(create_seq_entry, env, millisecs);
msg_id = sandesha2_msg_ctx_get_msg_id(create_seq_rm_msg, env);
sandesha2_sender_bean_set_msg_id(create_seq_entry, env, msg_id);
- sandesha2_sender_bean_set_seq_id(create_seq_entry, env,
+ sandesha2_sender_bean_set_internal_seq_id(create_seq_entry, env,
internal_seq_id);
sandesha2_sender_bean_set_send(create_seq_entry, env, AXIS2_TRUE);
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_FALSE);
@@ -1442,6 +1446,7 @@
/* TODO add_ack_requested */
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
app_msg_entry = sandesha2_sender_bean_create(env);
+ sandesha2_sender_bean_set_internal_seq_id(app_msg_entry, env, internal_seq_id);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "sandesha to_addr = %s ", to_addr);
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env) &&
sandesha2_utils_is_single_channel(env, rm_version, to_addr))
@@ -1505,7 +1510,6 @@
axis2_msg_ctx_set_property(app_msg_ctx, env,
SANDESHA2_SET_SEND_TO_TRUE, property);
}
- sandesha2_sender_bean_set_seq_id(app_msg_entry, env, internal_seq_id);
temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
axis2_op_ctx_increment_ref(temp_op_ctx, env);
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx);
Modified: webservices/sandesha/trunk/c/src/msgprocessors/create_seq_res_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/create_seq_res_msg_processor.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/create_seq_res_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/create_seq_res_msg_processor.c Wed Nov 28 20:22:54 2007
@@ -276,7 +276,6 @@
int_seq_id);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, out_seq_bean);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, int_seq_bean);
-
accept = sandesha2_create_seq_res_get_accept(csr_part, env);
if(accept)
{
Modified: webservices/sandesha/trunk/c/src/storage/sqlite/permanent_sender_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/storage/sqlite/permanent_sender_mgr.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/storage/sqlite/permanent_sender_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/storage/sqlite/permanent_sender_mgr.c Wed Nov 28 20:22:54 2007
@@ -71,7 +71,7 @@
if(0 == axutil_strcmp(col_name[i], "msg_ctx_ref_key"))
if(argv[i])
sandesha2_sender_bean_set_msg_ctx_ref_key(bean, env, argv[i]);
- if(0 == axutil_strcmp(col_name[i], "internal_seq_ID"))
+ if(0 == axutil_strcmp(col_name[i], "internal_seq_id"))
if(argv[i])
sandesha2_sender_bean_set_internal_seq_id(bean, env, argv[i]);
if(0 == axutil_strcmp(col_name[i], "sent_count"))
@@ -128,7 +128,7 @@
if(0 == axutil_strcmp(col_name[i], "msg_ctx_ref_key"))
if(argv[i])
sandesha2_sender_bean_set_msg_ctx_ref_key(bean, env, argv[i]);
- if(0 == axutil_strcmp(col_name[i], "internal_seq_ID"))
+ if(0 == axutil_strcmp(col_name[i], "internal_seq_id"))
if(argv[i])
sandesha2_sender_bean_set_internal_seq_id(bean, env, argv[i]);
if(0 == axutil_strcmp(col_name[i], "sent_count"))
@@ -250,7 +250,6 @@
{
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
- AXIS2_ENV_CHECK(env, NULL);
sender_mgr_impl = AXIS2_MALLOC(env->allocator,
sizeof(sandesha2_permanent_sender_mgr_t));
@@ -307,9 +306,8 @@
axis2_char_t *wsrm_anon_uri = sandesha2_sender_bean_get_wsrm_anon_uri(bean, env);
axis2_char_t *to_address = sandesha2_sender_bean_get_to_address(bean, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_insert");
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_insert");
AXIS2_PARAM_CHECK(env->error, bean, AXIS2_FALSE);
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
@@ -335,8 +333,8 @@
ret = sandesha2_permanent_bean_mgr_insert(sender_mgr_impl->bean_mgr, env,
(sandesha2_rm_bean_t *) bean, sandesha2_sender_retrieve_callback,
sql_retrieve, sql_update, sql_insert);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_insert:return:%d", ret);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_insert:return:%d", ret);
return ret;
}
@@ -349,9 +347,8 @@
axis2_char_t sql_retrieve[256];
axis2_char_t sql_remove[256];
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_remove");
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_remove");
AXIS2_PARAM_CHECK(env->error, msg_id, AXIS2_FALSE);
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
sprintf(sql_remove, "delete from sender where msg_id='%s'", msg_id);
@@ -359,8 +356,8 @@
"internal_seq_id, sent_count, msg_no, send, resend, time_to_send, "\
"msg_type, seq_id, wsrm_anon_uri, to_address from sender "\
"where msg_id='%s'", msg_id);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_remove");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_remove");
return sandesha2_permanent_bean_mgr_remove(sender_mgr_impl->bean_mgr, env,
sandesha2_sender_retrieve_callback, sql_retrieve, sql_remove);
}
@@ -375,9 +372,8 @@
sandesha2_sender_bean_t *ret = NULL;
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_retrieve");
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_retrieve");
AXIS2_PARAM_CHECK(env->error, msg_id, AXIS2_FALSE);
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
@@ -389,8 +385,8 @@
sender_mgr_impl->bean_mgr, env, sandesha2_sender_retrieve_callback,
sql_retrieve);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_retrieve");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_retrieve");
return ret;
}
@@ -419,9 +415,8 @@
axis2_char_t *wsrm_anon_uri = sandesha2_sender_bean_get_wsrm_anon_uri(bean, env);
axis2_char_t *to_address = sandesha2_sender_bean_get_to_address(bean, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_update");
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_update");
AXIS2_PARAM_CHECK(env->error, bean, AXIS2_FALSE);
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
@@ -440,8 +435,8 @@
ret = sandesha2_permanent_bean_mgr_update(sender_mgr_impl->bean_mgr, env,
(sandesha2_rm_bean_t *) bean, sandesha2_sender_retrieve_callback, sql_retrieve, sql_update);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_invoker_mgr_update:return:%d", ret);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_invoker_mgr_update:return:%d", ret);
return ret;
return AXIS2_SUCCESS;
@@ -458,9 +453,8 @@
sandesha2_sender_bean_t *bean = NULL;
axutil_array_list_t *ret = NULL;
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_find_by_internal_seq_id");
- AXIS2_ENV_CHECK(env, NULL);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_find_by_internal_seq_id");
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
bean = sandesha2_sender_bean_create(env);
@@ -475,8 +469,8 @@
sandesha2_sender_count_callback, sql_find, sql_count);
if(bean)
sandesha2_sender_bean_free((sandesha2_rm_bean_t *) bean, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_find_by_internal_seq_id");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_find_by_internal_seq_id");
return ret;
}
@@ -490,9 +484,8 @@
axis2_char_t *sql_count = NULL;
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
axutil_array_list_t *ret = NULL;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_find_by_sender_bean");
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_find_by_sender_bean");
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
sql_find = "select msg_id, msg_ctx_ref_key, internal_seq_id,"\
"sent_count, msg_no, send, resend, time_to_send, msg_type, seq_id, "\
@@ -501,8 +494,8 @@
ret = sandesha2_permanent_bean_mgr_find(sender_mgr_impl->bean_mgr, env,
(sandesha2_rm_bean_t *) bean, sandesha2_sender_find_callback,
sandesha2_sender_count_callback, sql_find, sql_count);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_find_by_sender_bean");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_find_by_sender_bean");
return ret;
}
@@ -515,7 +508,6 @@
axis2_char_t *sql_find = NULL;
axis2_char_t *sql_count = NULL;
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
- AXIS2_ENV_CHECK(env, AXIS2_FALSE);
AXIS2_PARAM_CHECK(env->error, bean, AXIS2_FALSE);
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
sql_find = "select msg_ctx_ref_key, internal_seq_id, "\
@@ -551,8 +543,8 @@
axis2_bool_t is_send = AXIS2_FALSE;
axis2_bool_t temp_is_send = AXIS2_FALSE;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Entry:sandesha2_permanent_sender_mgr_match");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_permanent_sender_mgr_match");
ref_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
(sandesha2_sender_bean_t *) bean, env);
temp_ref_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
@@ -584,6 +576,9 @@
if(internal_seq_id && temp_internal_seq_id && 0 != axutil_strcmp(
internal_seq_id, temp_internal_seq_id))
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Internal_seq_id %s does "\
+ "not match with temp internal_seq_id %s", internal_seq_id,
+ temp_internal_seq_id);
add = AXIS2_FALSE;
}
msg_no = sandesha2_sender_bean_get_msg_no(
@@ -620,9 +615,8 @@
{
add = AXIS2_FALSE;
}*/
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Exit:sandesha2_permanent_sender_mgr_match:add:%d",
- add);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_permanent_sender_mgr_match");
return add;
}
@@ -640,7 +634,10 @@
axutil_array_list_t *match_list = NULL;
sandesha2_permanent_sender_mgr_t *sender_mgr_impl = NULL;
sandesha2_sender_bean_t *result = NULL;
- AXIS2_ENV_CHECK(env, NULL);
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2]Entry:sandesha2_"\
+ "permanent_sender_mgr_get_next_msg_to_send");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2]Entry:sandesha2_"\
+ "permanent_sender_mgr_get_next_msg_to_send");
sender_mgr_impl = SANDESHA2_INTF_TO_IMPL(sender_mgr);
sandesha2_sender_bean_set_send(matcher, env, AXIS2_TRUE);
Modified: webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c Wed Nov 28 20:22:54 2007
@@ -821,7 +821,7 @@
storage_mgr_impl->msg_ctx_map, key, AXIS2_HASH_KEY_STRING);
if(msg_ctx)
return msg_ctx;
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "retrieved from database");
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2]retrieved from database");
msg_store_bean = sandesha2_permanent_bean_mgr_retrieve_msg_store_bean(
storage_mgr_impl->bean_mgr, env, key);
if (!msg_store_bean)
Modified: webservices/sandesha/trunk/c/src/util/ack_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/ack_mgr.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/util/ack_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/util/ack_mgr.c Wed Nov 28 20:22:54 2007
@@ -181,8 +181,8 @@
ack_msg_ctx, env));
sandesha2_sender_bean_set_resend(ack_bean, env, AXIS2_FALSE);
sandesha2_sender_bean_set_seq_id(ack_bean, env, seq_id);
+ sandesha2_sender_bean_set_internal_seq_id(ack_bean, env, seq_id);
sandesha2_sender_bean_set_send(ack_bean, env, AXIS2_TRUE);
- sandesha2_sender_bean_set_seq_id(ack_bean, env, seq_id);
property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST,
AXIS2_FALSE, 0, AXIS2_VALUE_FALSE);
axis2_msg_ctx_set_property(ack_msg_ctx, env,
Modified: webservices/sandesha/trunk/c/src/util/sandesha2_utils.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/sandesha2_utils.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/util/sandesha2_utils.c (original)
+++ webservices/sandesha/trunk/c/src/util/sandesha2_utils.c Wed Nov 28 20:22:54 2007
@@ -401,24 +401,23 @@
const axis2_bool_t persistent)
{
sandesha2_sender_t *sender = NULL;
- axutil_property_t *property = NULL;
axis2_status_t status = AXIS2_FAILURE;
AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
- property = axis2_ctx_get_property(axis2_conf_ctx_get_base(conf_ctx, env),
+ /*property = axis2_ctx_get_property(axis2_conf_ctx_get_base(conf_ctx, env),
env, SANDESHA2_SENDER);
if(property)
- sender = axutil_property_get_value(property, env);
+ sender = axutil_property_get_value(property, env);*/
if(!sender)
{
sender = sandesha2_sender_create(env);
- property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION,
+ /*property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION,
AXIS2_FALSE, (void *)sandesha2_sender_free_void_arg, sender);
axis2_ctx_set_property(axis2_conf_ctx_get_base(conf_ctx, env),
- env, SANDESHA2_SENDER, property);
+ env, SANDESHA2_SENDER, property);*/
}
status = sandesha2_sender_run_for_seq(sender, env, conf_ctx, seq_id,
persistent);
Modified: webservices/sandesha/trunk/c/src/util/terminate_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/terminate_mgr.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/util/terminate_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/util/terminate_mgr.c Wed Nov 28 20:22:54 2007
@@ -83,7 +83,7 @@
AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FAILURE);
-
+
if(!sandesha2_terminate_mgr_rcv_side_clean_map)
{
axutil_allocator_switch_to_global_pool(env->allocator);
@@ -655,7 +655,6 @@
long send_time = -1;
sandesha2_terminate_seq_t *terminate_seq = NULL;
axis2_char_t *seq_id = NULL;
- axis2_char_t *internal_seq_id = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2]Entry:sandesha2_terminate_mgr_add_terminate_seq_msg");
@@ -775,9 +774,8 @@
seq_id = sandesha2_identifier_get_identifier(
sandesha2_terminate_seq_get_identifier(terminate_seq,
env), env);
- internal_seq_id = sandesha2_utils_get_seq_property(env, seq_id,
- SANDESHA2_SEQ_PROP_INTERNAL_SEQ_ID, storage_mgr);
- sandesha2_sender_bean_set_seq_id(terminate_bean, env, internal_seq_id);
+ sandesha2_sender_bean_set_seq_id(terminate_bean, env, seq_id);
+ sandesha2_sender_bean_set_internal_seq_id(terminate_bean, env, int_seq_id);
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_FALSE);
sandesha2_msg_ctx_set_property(terminate_rm_msg, env,
Modified: webservices/sandesha/trunk/c/src/workers/sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?rev=599261&r1=599260&r2=599261&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender.c Wed Nov 28 20:22:54 2007
@@ -55,6 +55,7 @@
axutil_thread_mutex_t *mutex;
int seq_index;
int counter;
+ axis2_char_t *seq_id;
axis2_bool_t persistent_msg_ctx;
};
@@ -75,7 +76,6 @@
const axutil_env_t *env)
{
sandesha2_sender_t *sender = NULL;
- AXIS2_ENV_CHECK(env, NULL);
sender = (sandesha2_sender_t *)AXIS2_MALLOC
(env->allocator,
@@ -92,6 +92,7 @@
sender->mutex = NULL;
sender->counter = 0;
sender->seq_index = -1;
+ sender->seq_id = NULL;
sender->working_seqs = axutil_array_list_create(env,
AXIS2_ARRAY_LIST_DEFAULT_CAPACITY);
@@ -107,7 +108,6 @@
const axutil_env_t *env)
{
sandesha2_sender_t *sender_l = NULL;
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
sender_l = (sandesha2_sender_t *) sender;
return sandesha2_sender_free(sender_l, env);
@@ -118,7 +118,6 @@
sandesha2_sender_t *sender,
const axutil_env_t *env)
{
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
/* Do not free this */
sender->conf_ctx = NULL;
@@ -142,22 +141,9 @@
const axutil_env_t *env,
axis2_char_t *seq_id)
{
- int i = 0;
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
- for(i = 0; i < axutil_array_list_size(sender->working_seqs, env); i++)
- {
- axis2_char_t *tmp_id = NULL;
- tmp_id = axutil_array_list_get(sender->working_seqs, env, i);
- if(0 == axutil_strcmp(seq_id, tmp_id))
- {
- axutil_array_list_remove(sender->working_seqs, env, i);
- break;
- }
- }
- if(0 == axutil_array_list_size(sender->working_seqs, env))
- sender->run_sender = AXIS2_FALSE;
+ sender->run_sender = AXIS2_FALSE;
return AXIS2_SUCCESS;
}
@@ -166,8 +152,6 @@
sandesha2_sender_t *sender,
const axutil_env_t *env)
{
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
-
sender->run_sender = AXIS2_FALSE;
return AXIS2_SUCCESS;
}
@@ -189,23 +173,19 @@
axis2_char_t *seq_id,
const axis2_bool_t persistent)
{
- AXIS2_LOG_INFO(env->log, "Start:sandesha2_sender_run_for_seq");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_sender_run_for_seq");
axutil_thread_mutex_lock(sender->mutex);
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE);
- if(seq_id && !sandesha2_utils_array_list_contains(env,
- sender->working_seqs, seq_id))
- axutil_array_list_add(sender->working_seqs, env, seq_id);
- if(!sender->run_sender)
- {
- sender->conf_ctx = conf_ctx;
- sender->run_sender = AXIS2_TRUE;
- sender->persistent_msg_ctx = persistent;
- sandesha2_sender_run(sender, env);
- }
+ sender->conf_ctx = conf_ctx;
+ sender->run_sender = AXIS2_TRUE;
+ sender->persistent_msg_ctx = persistent;
+ sender->seq_id = seq_id;
+ sandesha2_sender_run(sender, env);
axutil_thread_mutex_unlock(sender->mutex);
- AXIS2_LOG_INFO(env->log, "Exit:sandesha2_sender_run_for_seq");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_sender_run_for_seq");
return AXIS2_SUCCESS;
}
@@ -217,8 +197,8 @@
axutil_thread_t *worker_thread = NULL;
sandesha2_sender_args_t *args = NULL;
- AXIS2_LOG_INFO(env->log, "Start:sandesha2_sender_run");
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_sender_run");
args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_sender_args_t));
args->impl = sender;
@@ -233,7 +213,8 @@
return AXIS2_FAILURE;
}
axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
- AXIS2_LOG_INFO(env->log, "End:sandesha2_sender_run");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_sender_run");
return AXIS2_SUCCESS;
}
@@ -250,60 +231,41 @@
axutil_env_t *env = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
- axis2_bool_t do_sleep = AXIS2_FALSE;
axis2_conf_t *conf = NULL;
+ axis2_module_desc_t *module_desc = NULL;
+ int sleep_time = 0;
+ axutil_qname_t *qname = NULL;
+ axutil_param_t *sleep_time_param = NULL;
+ axis2_char_t *seq_id = NULL;
args = (sandesha2_sender_args_t*)data;
env = axutil_init_thread_env(args->env);
sender = args->impl;
sender = (sandesha2_sender_t*)sender;
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "Start:sandesha2_sender_worker_func");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Entry:sandesha2_sender_worker_func");
conf = axis2_conf_ctx_get_conf(sender->conf_ctx, env);
storage_mgr = sandesha2_utils_get_storage_mgr(env, sender->conf_ctx,
conf);
-
+ qname = axutil_qname_create(env, SANDESHA2_MODULE, NULL, NULL);
+ module_desc = axis2_conf_get_module(conf, env, qname);
+ sleep_time_param = axis2_module_desc_get_param(module_desc, env,
+ SANDESHA2_SENDER_SLEEP);
+ if(sleep_time_param)
+ {
+ sleep_time = AXIS2_ATOI(axutil_param_get_value(sleep_time_param, env));
+ }
+ axutil_qname_free(qname, env);
+ seq_id = sender->seq_id;
while(sender->run_sender)
{
sandesha2_transaction_t *transaction = NULL;
- /* Use when transaction handling is done
- axis2_bool_t rollbacked = AXIS2_FALSE;*/
sandesha2_sender_mgr_t *mgr = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
sandesha2_sender_worker_t *sender_worker = NULL;
axis2_char_t *msg_id = NULL;
- axis2_char_t *seq_id = NULL;
- axis2_module_desc_t *module_desc = NULL;
- axutil_qname_t *qname = NULL;
- axutil_param_t *sleep_time_param = NULL;
- int no_of_seqs = 0, sleep_time = 0;
- no_of_seqs = axutil_array_list_size(sender->working_seqs, env);
- if(sender->seq_index >= no_of_seqs)
- {
- sender->seq_index = 0;
- if(no_of_seqs == 0)
- {
- do_sleep = AXIS2_TRUE;
- continue;
- }
- }
- seq_id = axutil_array_list_get(sender->working_seqs, env,
- sender->seq_index++);
transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
- qname = axutil_qname_create(env, SANDESHA2_MODULE, NULL, NULL);
- module_desc = axis2_conf_get_module(conf, env, qname);
- sleep_time_param = axis2_module_desc_get_param(module_desc, env,
- SANDESHA2_SENDER_SLEEP);
- if(sleep_time_param)
- {
- sleep_time = AXIS2_ATOI(axutil_param_get_value(sleep_time_param, env));
- }
- axutil_qname_free(qname, env);
- if(!transaction)
- {
- AXIS2_SLEEP(sleep_time);
- continue;
- }
seq_prop_mgr = sandesha2_storage_mgr_get_seq_property_mgr(
storage_mgr, env);
mgr = sandesha2_storage_mgr_get_retrans_mgr(storage_mgr, env);
@@ -330,11 +292,14 @@
sender_worker, env);
if(!status)
{
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Stopping the sender for sequence %s", seq_id);
sandesha2_sender_stop_sender_for_seq(sender, env, seq_id);
}
}
}
- AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "Exit:sandesha2_sender_worker_func");
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]Exit:sandesha2_sender_worker_func");
#ifdef AXIS2_SVR_MULTI_THREADED
AXIS2_THREAD_POOL_EXIT_THREAD(env->thread_pool, thd);
#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org