You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/06/17 16:54:04 UTC
svn commit: r668694 - in
/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008:
include/ src/handlers/ src/msgprocessors/ src/storage/sqlite/
Author: damitha
Date: Tue Jun 17 07:54:04 2008
New Revision: 668694
URL: http://svn.apache.org/viewvc?rev=668694&view=rev
Log:
Now dropped messages are responded properly.
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h Tue Jun 17 07:54:04 2008
@@ -79,7 +79,7 @@
sandesha2_sender_bean_set_send (
sandesha2_sender_bean_t *sender,
const axutil_env_t *env,
-axis2_bool_t send);
+ axis2_bool_t send);
axis2_char_t* AXIS2_CALL
sandesha2_sender_bean_get_internal_seq_id (
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c Tue Jun 17 07:54:04 2008
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+#include <axis2_engine.h>
#include <axis2_handler_desc.h>
#include <axutil_array_list.h>
#include <axis2_svc.h>
@@ -58,6 +59,7 @@
sandesha2_in_handler_process_dropped_msg(
struct axis2_handler *handler,
const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
@@ -176,7 +178,7 @@
if(dropped)
{
- sandesha2_in_handler_process_dropped_msg(handler, env, rm_msg_ctx, storage_mgr,
+ sandesha2_in_handler_process_dropped_msg(handler, env, conf_ctx, rm_msg_ctx, storage_mgr,
seq_prop_mgr, sender_mgr);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx dropped. So return here");
@@ -196,6 +198,7 @@
return AXIS2_SUCCESS;
}
+
/*
* TODO Validate the message
* sandesha2_msg_validator_validate(env, rm_msg_ctx);
@@ -208,6 +211,7 @@
sandesha2_msg_processor_process_in_msg(ack_proc, env, rm_msg_ctx);
sandesha2_msg_processor_free(ack_proc, env);
}*/
+
ack_requested = sandesha2_msg_ctx_get_ack_requested(rm_msg_ctx, env);
if(ack_requested)
{
@@ -404,11 +408,16 @@
return AXIS2_FALSE;
}
-
+/* In this function appropriately respond for the dropeed message. In two way messaging if the
+ * response for the dropped application message is not acknowledged then take the the response
+ * message from the database and append acknowledgment for the dropped message to it.
+ * Otherwise send the acknowledgment for the dropped message.
+ */
static axis2_status_t AXIS2_CALL
sandesha2_in_handler_process_dropped_msg(
struct axis2_handler *handler,
const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
@@ -425,16 +434,15 @@
sandesha2_seq_t *sequence = NULL;
axis2_char_t *rmd_sequence_id = NULL;
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came1");
sequence = sandesha2_msg_ctx_get_sequence(rm_msg_ctx, env);
if(sequence)
+ {
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(sequence,
env), env);
+ }
if(rmd_sequence_id)
{
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came2");
-
sandesha2_seq_property_bean_t *rcvd_msgs_bean = NULL;
axis2_char_t *rcvd_msgs_str = NULL;
sandesha2_msg_processor_t *app_msg_processor = NULL;
@@ -444,17 +452,60 @@
if(rcvd_msgs_bean)
{
+ sandesha2_sender_bean_t *sender_bean = NULL;
+ axis2_char_t *internal_sequence_id = NULL;
+ long msg_no = -1;
+ sandesha2_sender_bean_t *find_sender_bean = NULL;
+
rcvd_msgs_str = sandesha2_seq_property_bean_get_value(rcvd_msgs_bean, env);
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "rcvd_msgs_str:%s", rcvd_msgs_str);
+ msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(sequence, env), env);
+ internal_sequence_id = sandesha2_utils_get_internal_sequence_id(env, rmd_sequence_id);
+ find_sender_bean = sandesha2_sender_bean_create(env);
+ sandesha2_sender_bean_set_msg_no(find_sender_bean, env, msg_no);
+ sandesha2_sender_bean_set_internal_seq_id(find_sender_bean, env, internal_sequence_id);
+ sandesha2_sender_bean_set_send(find_sender_bean, env, AXIS2_TRUE);
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came3");
-
- app_msg_processor = sandesha2_app_msg_processor_create(env);
- sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, rcvd_msgs_str,
+ sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
+ if(sender_bean)
+ {
+ axis2_char_t *storage_key = NULL;
+ axis2_msg_ctx_t *app_msg_ctx = NULL;
+ sandesha2_msg_ctx_t *app_rm_msg_ctx = NULL;
+ axis2_engine_t *engine = NULL;
+ axis2_op_ctx_t *op_ctx = NULL;
+ axis2_msg_ctx_t *in_msg_ctx = NULL;
+
+ storage_key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+ app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, storage_key,
+ conf_ctx, AXIS2_TRUE);
+
+ in_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
+
+ axis2_msg_ctx_set_transport_out_stream(app_msg_ctx, env,
+ axis2_msg_ctx_get_transport_out_stream(in_msg_ctx, env));
+
+ axis2_msg_ctx_set_out_transport_info(app_msg_ctx, env,
+ axis2_msg_ctx_get_out_transport_info(in_msg_ctx, env));
+
+ op_ctx = axis2_msg_ctx_get_op_ctx(in_msg_ctx, env);
+
+ axis2_op_ctx_set_response_written(op_ctx, env, AXIS2_TRUE);
+
+ app_rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+ sandesha2_msg_creator_add_ack_msg(env, app_rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
+ engine = axis2_engine_create(env, conf_ctx);
+ axis2_engine_send(engine, env, app_msg_ctx);
+ sandesha2_msg_ctx_free(app_rm_msg_ctx, env);
+ }
+ else
+ {
+ app_msg_processor = sandesha2_app_msg_processor_create(env);
+ sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, rcvd_msgs_str,
rmd_sequence_id, storage_mgr, sender_mgr, seq_prop_mgr);
-
- sandesha2_msg_processor_free(app_msg_processor, env);
+
+ sandesha2_msg_processor_free(app_msg_processor, env);
+ }
}
}
}
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Tue Jun 17 07:54:04 2008
@@ -2280,6 +2280,7 @@
if (to_bean)
{
to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(to_bean, env));
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_to_addr1:%s", to_addr);
to_epr = axis2_endpoint_ref_create(env, to_addr);
sandesha2_seq_property_bean_free(to_bean, env);
}
@@ -2478,6 +2479,36 @@
sandesha2_seq_property_bean_free(from_acks_to_bean, env);
}
+ app_msg_sender_bean = sandesha2_sender_bean_create(env);
+ sandesha2_sender_bean_set_internal_seq_id(app_msg_sender_bean, env, internal_sequence_id);
+ sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_sender_bean, env, storage_key);
+ millisecs = sandesha2_utils_get_current_time_in_millis(env);
+ sandesha2_sender_bean_set_time_to_send(app_msg_sender_bean, env, millisecs);
+ msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
+ sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
+ sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
+ sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
+
+ if(!rms_sequence_id)
+ {
+ sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
+ }
+ else
+ {
+ sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
+ property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
+ axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
+ }
+
+ /**
+ * When we store application message context as below it should be noted
+ * that at Sandesha2/C client application side this is actually stored in
+ * in-memory whereas in the web service side it is actually stored in
+ * database only.
+ */
+ sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
+ sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx);
+
is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
/*
@@ -2495,6 +2526,7 @@
engine = axis2_engine_create(env, conf_ctx);
status = axis2_engine_resume_send(engine, env, app_msg_ctx);
+
if(engine)
{
axis2_engine_free(engine, env);
@@ -2535,36 +2567,6 @@
mep = axis2_op_get_msg_exchange_pattern(op, env);
}
- app_msg_sender_bean = sandesha2_sender_bean_create(env);
- sandesha2_sender_bean_set_internal_seq_id(app_msg_sender_bean, env, internal_sequence_id);
- sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_sender_bean, env, storage_key);
- millisecs = sandesha2_utils_get_current_time_in_millis(env);
- sandesha2_sender_bean_set_time_to_send(app_msg_sender_bean, env, millisecs);
- msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
- sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
- sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
- sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
-
- if(!rms_sequence_id)
- {
- sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
- }
- else
- {
- sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
- property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
- axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
- }
-
- /**
- * When we store application message context as below it should be noted
- * that at Sandesha2/C client application side this is actually stored in
- * in-memory whereas in the web service side it is actually stored in
- * database only.
- */
- sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
- sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx);
-
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
@@ -2677,14 +2679,14 @@
{
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
}
- if(transport_sender)
+ if(!transport_sender)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Transport sender could not be retrieved from transport_out");
status = AXIS2_FAILURE;
}
- while(AXIS2_TRUE)
+ while(AXIS2_TRUE && transport_sender)
{
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
@@ -2704,15 +2706,13 @@
}
AXIS2_SLEEP(retrans_interval);
- if(transport_sender)
+
+ /* This is neccessary to avoid a double free */
+ axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+ if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
{
- /* This is neccessary to avoid a double free */
- axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
- if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
- "[sandesha2] Transport sender invoke failed in sending application message");
- }
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Transport sender invoke failed in sending application message");
}
if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c Tue Jun 17 07:54:04 2008
@@ -128,9 +128,15 @@
if(0 == axutil_strcmp(col_name[i], "op_mep"))
if(argv[i])
sandesha2_msg_store_bean_set_op_mep(bean, env, argv[i]);
+
if(0 == axutil_strcmp(col_name[i], "to_url"))
- if(argv[i])
+ {
+ if(argv[i] && axutil_strcmp("(null)", argv[i]))
+ {
sandesha2_msg_store_bean_set_to_url(bean, env, argv[i]);
+ }
+ }
+
if(0 == axutil_strcmp(col_name[i], "transport_to"))
if(argv[i] && 0 != axutil_strcmp("(null)", argv[i]))
{
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c Tue Jun 17 07:54:04 2008
@@ -552,6 +552,7 @@
" and send = %d", send);
}
sprintf(sql_find + axutil_strlen(sql_find), ";");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_sql_find:%s", sql_find);
find_list = sandesha2_permanent_bean_mgr_find(sender_mgr_impl->bean_mgr, env,
sandesha2_sender_find_callback, sql_find);
@@ -608,10 +609,12 @@
{
sprintf(sql_find + axutil_strlen(sql_find), "time_to_send <= %ld ", time_now);
}*/
+
+ sprintf(sql_find + axutil_strlen(sql_find), "msg_type='%d'", SANDESHA2_MSG_TYPE_APPLICATION);
if(seq_id)
{
- sprintf(sql_find + axutil_strlen(sql_find), "internal_seq_id='%s'", seq_id);
+ sprintf(sql_find + axutil_strlen(sql_find), "and internal_seq_id='%s'", seq_id);
}
if(msg_id)
@@ -619,8 +622,6 @@
sprintf(sql_find + axutil_strlen(sql_find), "and msg_id='%s'", msg_id);
}
- sprintf(sql_find + axutil_strlen(sql_find), "and msg_type='%d'", SANDESHA2_MSG_TYPE_APPLICATION);
-
sprintf(sql_find + axutil_strlen(sql_find), " and send=%d", AXIS2_TRUE);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "sql_find:%s", sql_find);
Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c Tue Jun 17 07:54:04 2008
@@ -457,7 +457,9 @@
axiom_soap_builder_free(soap_builder, env);
return NULL;
}
+
axis2_msg_ctx_set_soap_envelope(msg_ctx, env, soap_envelope);
+
axis2_msg_ctx_set_msg_id(msg_ctx, env, sandesha2_msg_store_bean_get_msg_id(
msg_store_bean, env));
@@ -484,7 +486,7 @@
svc = axis2_conf_get_svc(conf, env, svc_name_str);
if(svc)
axis2_msg_ctx_set_svc(msg_ctx, env, svc);
- else
+ /*else
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Cannot build message "\
"context since service %s is not present", svc_name_str);
@@ -493,7 +495,7 @@
AXIS2_FAILURE);
axiom_soap_builder_free(soap_builder, env);
return NULL;
- }
+ */
}
op_name_str = sandesha2_msg_store_bean_get_op(msg_store_bean, env);
@@ -587,11 +589,13 @@
env);
if(transport_to_str)
{
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_transport_to_str:%s", transport_to_str);
axis2_msg_ctx_set_transport_url(msg_ctx, env, transport_to_str);
}
to_url_str = sandesha2_msg_store_bean_get_to_url(msg_store_bean, env);
if(to_url_str)
{
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_to_url_str:%s", to_url_str);
endpoint_ref = axis2_endpoint_ref_create(env, to_url_str);
axis2_msg_ctx_set_to(msg_ctx, env, endpoint_ref);
}
@@ -776,6 +780,7 @@
if(to)
{
address = (axis2_char_t *) axis2_endpoint_ref_get_address(to, env);
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_address:%s", address);
sandesha2_msg_store_bean_set_to_url(bean, env, address);
}
reply_to = axis2_msg_ctx_get_reply_to(msg_ctx, env);
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org