You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by da...@apache.org on 2006/11/16 11:53:29 UTC
svn commit: r475678 - in /webservices/sandesha/trunk/c: include/
src/msgprocessors/ src/storage/beans/ src/util/ src/workers/
Author: damitha
Date: Thu Nov 16 02:53:28 2006
New Revision: 475678
URL: http://svn.apache.org/viewvc?view=rev&rev=475678
Log:
refractered sender.c. Also removed macros
Added:
webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
webservices/sandesha/trunk/c/src/workers/sender_worker.c
Modified:
webservices/sandesha/trunk/c/include/sandesha2_sender.h
webservices/sandesha/trunk/c/include/sandesha2_sender_bean.h
webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
webservices/sandesha/trunk/c/src/storage/beans/sender_bean.c
webservices/sandesha/trunk/c/src/util/sandesha2_utils.c
webservices/sandesha/trunk/c/src/workers/Makefile.am
webservices/sandesha/trunk/c/src/workers/sender.c
Modified: webservices/sandesha/trunk/c/include/sandesha2_sender.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/include/sandesha2_sender.h?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/include/sandesha2_sender.h (original)
+++ webservices/sandesha/trunk/c/include/sandesha2_sender.h Thu Nov 16 02:53:28 2006
@@ -34,62 +34,11 @@
{
#endif
-typedef struct sandesha2_sender sandesha2_sender_t;
-typedef struct sandesha2_sender_ops sandesha2_sender_ops_t;
-
-/** @defgroup sandesha2_sender Sender
- * @ingroup sandesha2
- * @{
- */
-
-struct sandesha2_sender_ops
-{
- /**
- * Deallocate memory
- * @return status code
- */
- axis2_status_t (AXIS2_CALL *
- free) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
- axis2_status_t (AXIS2_CALL *
- stop_for_seq) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axis2_char_t *seq_id);
-
- axis2_status_t (AXIS2_CALL *
- stop_sending) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
- axis2_bool_t (AXIS2_CALL *
- is_sender_started) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
- axis2_status_t (AXIS2_CALL *
- run) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
- axis2_status_t (AXIS2_CALL *
- run_for_seq) (
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *seq_id);
-};
-
-struct sandesha2_sender
-{
- sandesha2_sender_ops_t *ops;
-};
+typedef struct sandesha2_sender_t sandesha2_sender_t;
AXIS2_EXTERN sandesha2_sender_t * AXIS2_CALL
sandesha2_sender_create(
- const axis2_env_t *env);
+ const axis2_env_t *env);
/**
* Frees the sender given as a void pointer. This method would cast the
@@ -103,30 +52,39 @@
void *sender,
const axis2_env_t *env);
-#define SANDESHA2_SENDER_FREE(sender, env) \
- (((sandesha2_sender_t *) sender)->ops->free (sender, env))
-
-#define SANDESHA2_SENDER_STOP_FOR_SEQ(sender, env, \
- seq_id) \
- (((sandesha2_sender_t *) sender)->ops->\
- stop_for_seq (sender, env, seq_id))
-
-#define SANDESHA2_SENDER_STOP_SENDING(sender, env) \
- (((sandesha2_sender_t *) sender)->ops->\
- stop_sending (sender, env))
-
-#define SANDESHA2_SENDER_IS_SENDER_STARTED(sender, env) \
- (((sandesha2_sender_t *) sender)->ops->\
- is_sender_started (sender, env))
-
-#define SANDESHA2_SENDER_RUN(sender, env) \
- (((sandesha2_sender_t *) sender)->ops->\
- run (sender, env))
-
-#define SANDESHA2_SENDER_RUN_FOR_SEQ(sender, env, conf_ctx, seq_id) \
- (((sandesha2_sender_t *) sender)->ops->\
- run_for_seq (sender, env, conf_ctx, seq_id))
+axis2_status_t AXIS2_CALL
+sandesha2_sender_free(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env);
+axis2_status_t AXIS2_CALL
+sandesha2_sender_stop_sender_for_seq(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *seq_id);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_stop_sending (
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env);
+
+axis2_bool_t AXIS2_CALL
+sandesha2_sender_is_sender_started(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_run_for_seq
+ (sandesha2_sender_t *sender,
+ const axis2_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *seq_id);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_run (
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env);
+
/** @} */
#ifdef __cplusplus
}
Modified: webservices/sandesha/trunk/c/include/sandesha2_sender_bean.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/include/sandesha2_sender_bean.h?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/include/sandesha2_sender_bean.h (original)
+++ webservices/sandesha/trunk/c/include/sandesha2_sender_bean.h Thu Nov 16 02:53:28 2006
@@ -158,6 +158,17 @@
const axis2_env_t *env,
axis2_char_t *seq_id);
+axis2_char_t* AXIS2_CALL
+sandesha2_sender_bean_get_wsrm_anon_uri(
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env);
+
+void AXIS2_CALL
+sandesha2_sender_bean_set_wsrm_anon_uri (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *anon_uri);
+
#ifdef __cplusplus
}
Added: webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h?view=auto&rev=475678
==============================================================================
--- webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h (added)
+++ webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h Thu Nov 16 02:53:28 2006
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SANDESHA2_SENDER_WORKER_H
+#define SANDESHA2_SENDER_WORKER_H
+
+/**
+ * @file sandesha2_sender_worker.h
+ * @brief Sandesha Sender Interface
+ */
+
+#include <axis2_allocator.h>
+#include <axis2_env.h>
+#include <axis2_error.h>
+#include <axis2_string.h>
+#include <axis2_utils.h>
+#include <axis2_conf_ctx.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+typedef struct sandesha2_sender_worker_t sandesha2_sender_worker_t;
+
+AXIS2_EXTERN sandesha2_sender_worker_t* AXIS2_CALL
+sandesha2_sender_worker_create(
+ const axis2_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *msg_id);
+
+/**
+ * Frees the sender_worker given as a void pointer. This method would cast the
+ * void parameter to an sender_worker pointer and then call free method.
+ * @param sender_worker pointer to sender_worker as a void pointer
+ * @param env pointer to environment struct
+ * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
+ */
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_free_void_arg(
+ void *sender_worker,
+ const axis2_env_t *env);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_free(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_stop_sender_worker_for_seq(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axis2_char_t *seq_id);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_stop_sending (
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_sender_worker_started(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_run_for_seq
+ (sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *seq_id);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_run (
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+void sandesha2_sender_worker_set_out_transport(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axis2_transport_out_desc_t *transport_out);
+
+/** @} */
+#ifdef __cplusplus
+}
+#endif
+#endif /* SANDESHA2_SENDER_WORKER_H */
Modified: webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/trunk/c/src/msgprocessors/app_msg_processor.c Thu Nov 16 02:53:28 2006
@@ -964,7 +964,7 @@
sandesha2_msg_ctx_t *create_seq_rm_msg = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
sandesha2_create_seq_mgr_t *create_seq_man = NULL;
- sandesha2_sender_mgr_t *retransmitter_man = NULL;
+ sandesha2_sender_mgr_t *retrans_mgr = NULL;
sandesha2_seq_offer_t *seq_offer = NULL;
axis2_msg_ctx_t *create_seq_msg = NULL;
sandesha2_create_seq_bean_t *create_seq_bean = NULL;
@@ -997,8 +997,7 @@
mgr, env);
create_seq_man = sandesha2_storage_mgr_get_create_seq_mgr(
mgr, env);
- retransmitter_man = sandesha2_storage_mgr_get_retrans_mgr
- (mgr, env);
+ retrans_mgr = sandesha2_storage_mgr_get_retrans_mgr(mgr, env);
seq_offer = sandesha2_create_seq_get_seq_offer(create_seq_part, env);
if(NULL != seq_offer)
{
@@ -1056,8 +1055,7 @@
AXIS2_FALSE);
sandesha2_sender_bean_set_msg_type(create_seq_entry, env,
SANDESHA2_MSG_TYPE_CREATE_SEQ);
- sandesha2_sender_mgr_insert(retransmitter_man, env,
- create_seq_entry);
+ sandesha2_sender_mgr_insert(retrans_mgr, env, create_seq_entry);
sandesha2_storage_mgr_store_msg_ctx(mgr, env, str_key, create_seq_msg);
property = axis2_property_create(env);
AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
@@ -1100,7 +1098,7 @@
{
axis2_msg_ctx_t *app_msg_ctx = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
- sandesha2_sender_mgr_t *retransmitter_man = NULL;
+ sandesha2_sender_mgr_t *retrans_mgr = NULL;
sandesha2_seq_property_bean_t *to_bean = NULL;
sandesha2_seq_property_bean_t *reply_to_bean = NULL;
sandesha2_seq_property_bean_t *out_seq_bean = NULL;
@@ -1132,7 +1130,7 @@
seq_prop_mgr = sandesha2_storage_mgr_get_seq_property_mgr(
mgr, env);
- retransmitter_man = sandesha2_storage_mgr_get_retrans_mgr
+ retrans_mgr = sandesha2_storage_mgr_get_retrans_mgr
(mgr, env);
to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_seq_id, SANDESHA2_SEQ_PROP_TO_EPR);
@@ -1281,7 +1279,7 @@
}
sandesha2_sender_bean_set_internal_seq_id(app_msg_entry, env, internal_seq_id);
sandesha2_storage_mgr_store_msg_ctx(mgr, env, storage_key, app_msg_ctx);
- sandesha2_sender_mgr_insert(retransmitter_man, env, app_msg_entry);
+ sandesha2_sender_mgr_insert(retrans_mgr, env, app_msg_entry);
property = axis2_property_create(env);
AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
Modified: webservices/sandesha/trunk/c/src/storage/beans/sender_bean.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/storage/beans/sender_bean.c?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/storage/beans/sender_bean.c (original)
+++ webservices/sandesha/trunk/c/src/storage/beans/sender_bean.c Thu Nov 16 02:53:28 2006
@@ -33,6 +33,7 @@
long time_to_send;
int msg_type;
axis2_char_t *seq_id;
+ axis2_char_t *wsrm_anon_uri;
};
@@ -62,19 +63,21 @@
sender->time_to_send = -1;
sender->msg_type = 0;
sender->seq_id = NULL;
+ sender->wsrm_anon_uri = NULL;
return sender;
}
AXIS2_EXTERN sandesha2_sender_bean_t* AXIS2_CALL
-sandesha2_sender_bean_create_with_data(const axis2_env_t *env,
- axis2_char_t *msg_id,
- axis2_char_t *key,
- axis2_bool_t send,
- long time_to_send,
- axis2_char_t *int_seq_id,
- long msg_no)
+sandesha2_sender_bean_create_with_data(
+ const axis2_env_t *env,
+ axis2_char_t *msg_id,
+ axis2_char_t *key,
+ axis2_bool_t send,
+ long time_to_send,
+ axis2_char_t *int_seq_id,
+ long msg_no)
{
sandesha2_sender_bean_t *sender = NULL;
@@ -100,6 +103,7 @@
sender->time_to_send = time_to_send;
sender->msg_type = 0;
sender->seq_id = NULL;
+ sender->wsrm_anon_uri = NULL;
return sender;
}
@@ -114,42 +118,44 @@
AXIS2_FREE(env->allocator, sender->msg_context_ref_key);
sender->msg_context_ref_key = NULL;
}
-
if(sender->msg_id)
{
AXIS2_FREE(env->allocator, sender->msg_id);
sender->msg_id = NULL;
}
-
if(sender->internal_seq_id)
{
AXIS2_FREE(env->allocator, sender->internal_seq_id);
sender->internal_seq_id = NULL;
}
-
if(sender->seq_id)
{
AXIS2_FREE(env->allocator, sender->seq_id);
sender->seq_id = NULL;
}
+ if(sender->wsrm_anon_uri)
+ {
+ AXIS2_FREE(env->allocator, sender->wsrm_anon_uri);
+ sender->wsrm_anon_uri = NULL;
+ }
return AXIS2_SUCCESS;
}
axis2_char_t* AXIS2_CALL
- sandesha2_sender_bean_get_msg_ctx_ref_key (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_msg_ctx_ref_key (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->msg_context_ref_key;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_msg_ctx_ref_key (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_char_t *ref_key)
+sandesha2_sender_bean_set_msg_ctx_ref_key (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *ref_key)
{
if(sender->msg_context_ref_key)
{
@@ -162,18 +168,18 @@
}
axis2_char_t* AXIS2_CALL
- sandesha2_sender_bean_get_msg_id (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_msg_id (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->msg_id;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_msg_id (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_char_t *msg_id)
+sandesha2_sender_bean_set_msg_id (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *msg_id)
{
if(sender->msg_id)
{
@@ -186,9 +192,9 @@
axis2_bool_t AXIS2_CALL
- sandesha2_sender_bean_is_send (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_is_send (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->send;
}
@@ -196,28 +202,28 @@
void AXIS2_CALL
- sandesha2_sender_bean_set_send (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_bool_t send)
+sandesha2_sender_bean_set_send (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_bool_t send)
{
sender->send = send;
}
axis2_char_t* AXIS2_CALL
- sandesha2_sender_bean_get_internal_seq_id (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_internal_seq_id (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->internal_seq_id;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_internal_seq_id (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_char_t *int_seq_id)
+sandesha2_sender_bean_set_internal_seq_id (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *int_seq_id)
{
if(sender->internal_seq_id)
{
@@ -229,98 +235,98 @@
}
int AXIS2_CALL
- sandesha2_sender_bean_get_sent_count (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_sent_count (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->sent_count;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_sent_count (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- int sent_count)
+sandesha2_sender_bean_set_sent_count (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ int sent_count)
{
sender->sent_count = sent_count;
}
long AXIS2_CALL
- sandesha2_sender_bean_get_msg_no (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_msg_no (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->msg_no;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_msg_no (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- long msg_no)
+sandesha2_sender_bean_set_msg_no (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ long msg_no)
{
sender->msg_no = msg_no;
}
axis2_bool_t AXIS2_CALL
- sandesha2_sender_bean_is_resend (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_is_resend (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->resend;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_resend (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_bool_t resend)
+sandesha2_sender_bean_set_resend (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_bool_t resend)
{
sender->resend = resend;
}
long AXIS2_CALL
- sandesha2_sender_bean_get_time_to_send (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_time_to_send (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->time_to_send;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_time_to_send (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- long time_to_send)
+sandesha2_sender_bean_set_time_to_send (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ long time_to_send)
{
sender->time_to_send = time_to_send;
}
int AXIS2_CALL
- sandesha2_sender_bean_get_msg_type (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_msg_type (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->msg_type;
}
void AXIS2_CALL
- sandesha2_sender_bean_set_msg_type (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- int msg_type)
+sandesha2_sender_bean_set_msg_type (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ int msg_type)
{
sender->msg_type = msg_type;
}
axis2_char_t* AXIS2_CALL
- sandesha2_sender_bean_get_seq_id(
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_bean_get_seq_id(
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
{
return sender->seq_id;
}
@@ -328,10 +334,10 @@
void AXIS2_CALL
- sandesha2_sender_bean_set_seq_id (
- sandesha2_sender_bean_t *sender,
- const axis2_env_t *env,
- axis2_char_t *seq_id)
+sandesha2_sender_bean_set_seq_id (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *seq_id)
{
if(sender->seq_id)
{
@@ -340,5 +346,28 @@
}
sender->seq_id = (axis2_char_t *)AXIS2_STRDUP(seq_id, env);
+}
+
+axis2_char_t* AXIS2_CALL
+sandesha2_sender_bean_get_wsrm_anon_uri(
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env)
+{
+ return sender->wsrm_anon_uri;
+}
+
+void AXIS2_CALL
+sandesha2_sender_bean_set_wsrm_anon_uri (
+ sandesha2_sender_bean_t *sender,
+ const axis2_env_t *env,
+ axis2_char_t *anon_uri)
+{
+ if(sender->wsrm_anon_uri)
+ {
+ AXIS2_FREE(env->allocator, sender->wsrm_anon_uri);
+ sender->wsrm_anon_uri = NULL;
+ }
+
+ sender->wsrm_anon_uri = (axis2_char_t *)AXIS2_STRDUP(anon_uri, env);
}
Modified: webservices/sandesha/trunk/c/src/util/sandesha2_utils.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/sandesha2_utils.c?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/util/sandesha2_utils.c (original)
+++ webservices/sandesha/trunk/c/src/util/sandesha2_utils.c Thu Nov 16 02:53:28 2006
@@ -398,7 +398,7 @@
AXIS2_CTX_SET_PROPERTY(AXIS2_CONF_CTX_GET_BASE(conf_ctx, env),
env, SANDESHA2_SENDER, property, AXIS2_FALSE);
}
- SANDESHA2_SENDER_RUN_FOR_SEQ(sender, env, conf_ctx, seq_id);
+ sandesha2_sender_run_for_seq(sender, env, conf_ctx, seq_id);
return AXIS2_SUCCESS;
}
Modified: webservices/sandesha/trunk/c/src/workers/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/Makefile.am?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/Makefile.am (original)
+++ webservices/sandesha/trunk/c/src/workers/Makefile.am Thu Nov 16 02:53:28 2006
@@ -1,7 +1,8 @@
noinst_LTLIBRARIES = libsandesha2_workers.la
libsandesha2_workers_la_SOURCES = in_order_invoker.c\
- sender.c
+ sender.c \
+ sender_worker.c
INCLUDES = -I$(top_builddir)/include \
@AXIS2INC@
Modified: webservices/sandesha/trunk/c/src/workers/sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?view=diff&rev=475678&r1=475677&r2=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender.c Thu Nov 16 02:53:28 2006
@@ -23,6 +23,12 @@
#include <sandesha2_seq_property_mgr.h>
#include <sandesha2_msg_ctx.h>
#include <sandesha2_seq.h>
+#include <sandesha2_sender_worker.h>
+#include <sandesha2_msg_init.h>
+#include <sandesha2_terminate_seq.h>
+#include <sandesha2_sender_worker.h>
+#include <sandesha2_terminate_mgr.h>
+#include <sandesha2_msg_retrans_adjuster.h>
#include <axis2_addr.h>
#include <axis2_engine.h>
#include <stdlib.h>
@@ -31,22 +37,17 @@
#include <axiom_soap_const.h>
#include <axiom_soap_fault.h>
#include <axiom_soap_body.h>
-#include <sandesha2_msg_init.h>
-#include <sandesha2_terminate_seq.h>
-#include <sandesha2_terminate_mgr.h>
-#include <sandesha2_msg_retrans_adjuster.h>
#include <platforms/axis2_platform_auto_sense.h>
/**
* @brief Sender struct impl
* Sandesha2 Sender Invoker
*/
-typedef struct sandesha2_sender_impl sandesha2_sender_impl_t;
typedef struct sandesha2_sender_args sandesha2_sender_args_t;
+#define SENDER_SLEEP_TIME 1
-struct sandesha2_sender_impl
+struct sandesha2_sender_t
{
- sandesha2_sender_t sender;
axis2_conf_ctx_t *conf_ctx;
axis2_bool_t run_sender;
axis2_array_list_t *working_seqs;
@@ -56,115 +57,44 @@
struct sandesha2_sender_args
{
- sandesha2_sender_impl_t *impl;
+ sandesha2_sender_t *impl;
axis2_env_t *env;
};
-#define SANDESHA2_INTF_TO_IMPL(sender) \
- ((sandesha2_sender_impl_t *)(sender))
-
-/***************************** Function headers *******************************/
-axis2_status_t AXIS2_CALL
-sandesha2_sender_stop_sender_for_seq
- (sandesha2_sender_t *sender,
- const axis2_env_t *env, axis2_char_t *seq_id);
-
-axis2_status_t AXIS2_CALL
-sandesha2_sender_stop_sending (sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_sender_started
- (sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
-axis2_status_t AXIS2_CALL
-sandesha2_sender_run_for_seq
- (sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *seq_id);
-
-axis2_status_t AXIS2_CALL
-sandesha2_sender_run (sandesha2_sender_t *sender,
- const axis2_env_t *env);
-
-axis2_status_t AXIS2_CALL
-sandesha2_sender_check_for_sync_res(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axis2_msg_ctx_t *msg_ctx);
-
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_ack_already_piggybacked(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- sandesha2_msg_ctx_t *rm_msg_ctx);
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_fault_envelope(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axiom_soap_envelope_t *soap_envelope);
-
-
static void * AXIS2_THREAD_FUNC
-sandesha2_sender_worker_func(axis2_thread_t *thd, void *data);
-
-axis2_status_t AXIS2_CALL
-sandesha2_sender_free(sandesha2_sender_t *sender,
- const axis2_env_t *env);
+sandesha2_sender_worker_func(
+ axis2_thread_t *thd,
+ void *data);
-/***************************** End of function headers ************************/
AXIS2_EXTERN sandesha2_sender_t* AXIS2_CALL
-sandesha2_sender_create(const axis2_env_t *env)
+sandesha2_sender_create(
+ const axis2_env_t *env)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
+ sandesha2_sender_t *sender = NULL;
AXIS2_ENV_CHECK(env, NULL);
- sender_impl = (sandesha2_sender_impl_t *)AXIS2_MALLOC
+ sender = (sandesha2_sender_t *)AXIS2_MALLOC
(env->allocator,
- sizeof(sandesha2_sender_impl_t));
+ sizeof(sandesha2_sender_t));
- if(NULL == sender_impl)
+ if(NULL == sender)
{
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
- sender_impl->conf_ctx = NULL;
- sender_impl->run_sender = AXIS2_FALSE;
- sender_impl->working_seqs = NULL;
- sender_impl->mutex = NULL;
- sender_impl->counter = 0;
- sender_impl->sender.ops = NULL;
-
- sender_impl->sender.ops = AXIS2_MALLOC(env->allocator,
- sizeof(sandesha2_sender_ops_t));
- if(NULL == sender_impl->sender.ops)
- {
- sandesha2_sender_free(
- (sandesha2_sender_t*)sender_impl, env);
- AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
- return NULL;
- }
+ sender->conf_ctx = NULL;
+ sender->run_sender = AXIS2_FALSE;
+ sender->working_seqs = NULL;
+ sender->mutex = NULL;
+ sender->counter = 0;
- sender_impl->working_seqs = axis2_array_list_create(env,
+ sender->working_seqs = axis2_array_list_create(env,
AXIS2_ARRAY_LIST_DEFAULT_CAPACITY);
- sender_impl->mutex = axis2_thread_mutex_create(env->allocator,
+ sender->mutex = axis2_thread_mutex_create(env->allocator,
AXIS2_THREAD_MUTEX_DEFAULT);
- sender_impl->sender.ops->stop_for_seq =
- sandesha2_sender_stop_sender_for_seq;
- sender_impl->sender.ops->stop_sending =
- sandesha2_sender_stop_sending;
- sender_impl->sender.ops->is_sender_started =
- sandesha2_sender_is_sender_started;
- sender_impl->sender.ops->run_for_seq =
- sandesha2_sender_run_for_seq;
- sender_impl->sender.ops->run = sandesha2_sender_run;
- sender_impl->sender.ops->free = sandesha2_sender_free;
-
- return &(sender_impl->sender);
+ return sender;
}
axis2_status_t AXIS2_CALL
@@ -180,126 +110,106 @@
}
axis2_status_t AXIS2_CALL
-sandesha2_sender_free(sandesha2_sender_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_free(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
/* Do not free this */
- sender_impl->conf_ctx = NULL;
+ sender->conf_ctx = NULL;
- if(NULL != sender_impl->mutex)
+ if(sender->mutex)
{
- axis2_thread_mutex_destroy(sender_impl->mutex);
- sender_impl->mutex = NULL;
+ axis2_thread_mutex_destroy(sender->mutex);
+ sender->mutex = NULL;
}
- if(NULL != sender_impl->working_seqs)
+ if(sender->working_seqs)
{
- AXIS2_ARRAY_LIST_FREE(sender_impl->working_seqs, env);
- sender_impl->working_seqs = NULL;
+ AXIS2_ARRAY_LIST_FREE(sender->working_seqs, env);
+ sender->working_seqs = NULL;
}
- if(NULL != sender->ops)
- {
- AXIS2_FREE(env->allocator, sender->ops);
- sender->ops = NULL;
- }
- AXIS2_FREE(env->allocator, SANDESHA2_INTF_TO_IMPL(sender));
+ AXIS2_FREE(env->allocator, sender);
return AXIS2_SUCCESS;
}
axis2_status_t AXIS2_CALL
-sandesha2_sender_stop_sender_for_seq
- (sandesha2_sender_t *sender,
- const axis2_env_t *env, axis2_char_t *seq_id)
+sandesha2_sender_stop_sender_for_seq(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env, axis2_char_t *seq_id)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
int i = 0;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
- for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env); i++)
+ for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(sender->working_seqs, env); i++)
{
axis2_char_t *tmp_id = NULL;
- tmp_id = AXIS2_ARRAY_LIST_GET(sender_impl->working_seqs, env, i);
+ tmp_id = AXIS2_ARRAY_LIST_GET(sender->working_seqs, env, i);
if(0 == AXIS2_STRCMP(seq_id, tmp_id))
{
- AXIS2_ARRAY_LIST_REMOVE(sender_impl->working_seqs, env, i);
+ AXIS2_ARRAY_LIST_REMOVE(sender->working_seqs, env, i);
break;
}
}
- if(0 == AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env))
- sender_impl->run_sender = AXIS2_FALSE;
+ if(0 == AXIS2_ARRAY_LIST_SIZE(sender->working_seqs, env))
+ sender->run_sender = AXIS2_FALSE;
return AXIS2_SUCCESS;
}
axis2_status_t AXIS2_CALL
-sandesha2_sender_stop_sending (sandesha2_sender_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_stop_sending (
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
- SANDESHA2_INTF_TO_IMPL(sender)->run_sender = AXIS2_FALSE;
+ sender->run_sender = AXIS2_FALSE;
return AXIS2_SUCCESS;
}
axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_sender_started
- (sandesha2_sender_t *sender,
- const axis2_env_t *env)
+sandesha2_sender_is_sender_started(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
-
AXIS2_ENV_CHECK(env, AXIS2_FALSE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
- return sender_impl->run_sender;
+ return sender->run_sender;
}
axis2_status_t AXIS2_CALL
-sandesha2_sender_run_for_seq
- (sandesha2_sender_t *sender,
- const axis2_env_t *env, axis2_conf_ctx_t *conf_ctx,
- axis2_char_t *seq_id)
+sandesha2_sender_run_for_seq(
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env, axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *seq_id)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
-
- axis2_thread_mutex_lock(sender_impl->mutex);
- if(seq_id && AXIS2_FALSE == sandesha2_utils_array_list_contains(env,
- sender_impl->working_seqs, seq_id))
- AXIS2_ARRAY_LIST_ADD(sender_impl->working_seqs, env, seq_id);
- if(AXIS2_FALSE == sender_impl->run_sender)
+ if(seq_id && !sandesha2_utils_array_list_contains(env,
+ sender->working_seqs, seq_id))
+ AXIS2_ARRAY_LIST_ADD(sender->working_seqs, env, seq_id);
+ if(!sender->run_sender)
{
- sender_impl->conf_ctx = conf_ctx;
- sender_impl->run_sender = AXIS2_TRUE;
+ sender->conf_ctx = conf_ctx;
+ sender->run_sender = AXIS2_TRUE;
sandesha2_sender_run(sender, env);
}
- axis2_thread_mutex_unlock(sender_impl->mutex);
return AXIS2_SUCCESS;
}
axis2_status_t AXIS2_CALL
sandesha2_sender_run (
- sandesha2_sender_t *sender,
- const axis2_env_t *env)
+ sandesha2_sender_t *sender,
+ const axis2_env_t *env)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
axis2_thread_t *worker_thread = NULL;
sandesha2_sender_args_t *args = NULL;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
-
args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_sender_args_t));
- args->impl = sender_impl;
+ args->impl = sender;
args->env = (axis2_env_t*)env;
worker_thread = AXIS2_THREAD_POOL_GET_THREAD(env->thread_pool,
@@ -315,182 +225,39 @@
return AXIS2_SUCCESS;
}
-axis2_status_t AXIS2_CALL
-sandesha2_sender_check_for_sync_res(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axis2_msg_ctx_t *msg_ctx)
-{
- axis2_property_t *property = NULL;
- axis2_msg_ctx_t *res_msg_ctx = NULL;
- axis2_op_ctx_t *req_op_ctx = NULL;
- axiom_soap_envelope_t *res_envelope = NULL;
- axis2_char_t *soap_ns_uri = NULL;
-
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
- property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN,
- AXIS2_FALSE);
- if(NULL == property)
- return AXIS2_SUCCESS;
-
- res_msg_ctx = axis2_msg_ctx_create(env, AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx,
- env), AXIS2_MSG_CTX_GET_TRANSPORT_IN_DESC(
- msg_ctx, env), AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx,
- env));
- AXIS2_MSG_CTX_SET_SERVER_SIDE(res_msg_ctx, env, AXIS2_FALSE);
- AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env, AXIS2_TRANSPORT_IN,
- AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN,
- AXIS2_FALSE), AXIS2_FALSE);
- AXIS2_MSG_CTX_SET_SVC_CTX(res_msg_ctx, env, AXIS2_MSG_CTX_GET_SVC_CTX(
- msg_ctx, env));
- AXIS2_MSG_CTX_SET_SVC_GRP_CTX(res_msg_ctx, env,
- AXIS2_MSG_CTX_GET_SVC_GRP_CTX(msg_ctx, env));
- req_op_ctx = AXIS2_MSG_CTX_GET_OP_CTX(msg_ctx, env);
- if(NULL != req_op_ctx)
- {
- axis2_ctx_t *ctx = NULL;
-
- ctx = AXIS2_OP_CTX_GET_BASE(req_op_ctx, env);
- if(NULL != AXIS2_CTX_GET_PROPERTY(ctx, env, MTOM_RECIVED_CONTENT_TYPE,
- AXIS2_FALSE))
- {
- AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env,
- MTOM_RECIVED_CONTENT_TYPE, AXIS2_CTX_GET_PROPERTY(ctx, env,
- MTOM_RECIVED_CONTENT_TYPE, AXIS2_FALSE), AXIS2_FALSE);
- }
- if(NULL != AXIS2_CTX_GET_PROPERTY(ctx, env, AXIS2_HTTP_CHAR_SET_ENCODING,
- AXIS2_FALSE))
- {
- AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env,
- AXIS2_HTTP_CHAR_SET_ENCODING, AXIS2_CTX_GET_PROPERTY(ctx, env,
- AXIS2_HTTP_CHAR_SET_ENCODING, AXIS2_FALSE), AXIS2_FALSE);
- }
- }
- AXIS2_MSG_CTX_SET_DOING_REST(res_msg_ctx, env, AXIS2_MSG_CTX_GET_DOING_REST(
- msg_ctx, env));
- soap_ns_uri = AXIS2_MSG_CTX_GET_IS_SOAP_11(msg_ctx, env) ?
- AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
- AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
-
- res_envelope = axis2_http_transport_utils_create_soap_msg(env, msg_ctx,
- soap_ns_uri);
-
- AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
- AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env,
- SANDESHA2_WITHIN_TRANSACTION, AXIS2_FALSE), AXIS2_FALSE);
- if(NULL != res_envelope)
- {
- axis2_engine_t *engine = NULL;
- AXIS2_MSG_CTX_SET_SOAP_ENVELOPE(res_msg_ctx, env, res_envelope);
-
- engine = axis2_engine_create(env, AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx,
- env));
- if(AXIS2_TRUE == sandesha2_sender_is_fault_envelope(sender, env,
- res_envelope))
- AXIS2_ENGINE_RECEIVE_FAULT(engine, env, res_msg_ctx);
- else
- AXIS2_ENGINE_RECEIVE(engine, env, res_msg_ctx);
- }
- return AXIS2_SUCCESS;
-}
-
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_piggybackable_msg_type(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- int msg_type)
-{
-
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
-
- if(SANDESHA2_MSG_TYPE_ACK == msg_type)
- return AXIS2_FALSE;
-
- return AXIS2_TRUE;
-}
-
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_ack_already_piggybacked(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- sandesha2_msg_ctx_t *rm_msg_ctx)
-{
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
-
- if(NULL != sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env,
- SANDESHA2_MSG_PART_SEQ_ACKNOWLEDGEMENT))
- return AXIS2_TRUE;
-
- return AXIS2_FALSE;
-}
-
-axis2_bool_t AXIS2_CALL
-sandesha2_sender_is_fault_envelope(
- sandesha2_sender_t *sender,
- const axis2_env_t *env,
- axiom_soap_envelope_t *soap_envelope)
-{
- axiom_soap_fault_t *fault = NULL;
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
- AXIS2_PARAM_CHECK(env->error, soap_envelope, AXIS2_FAILURE);
-
- fault = AXIOM_SOAP_BODY_GET_FAULT(AXIOM_SOAP_ENVELOPE_GET_BODY(soap_envelope,
- env), env);
- if(NULL != fault)
- return AXIS2_TRUE;
-
- return AXIS2_FALSE;
-}
-
/**
* Thread worker function.
*/
static void * AXIS2_THREAD_FUNC
sandesha2_sender_worker_func(
- axis2_thread_t *thd,
- void *data)
+ axis2_thread_t *thd,
+ void *data)
{
- sandesha2_sender_impl_t *sender_impl = NULL;
sandesha2_sender_t *sender = NULL;
sandesha2_sender_args_t *args;
axis2_env_t *env = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
args = (sandesha2_sender_args_t*)data;
env = axis2_init_thread_env(args->env);
- sender_impl = args->impl;
- sender = (sandesha2_sender_t*)sender_impl;
+ sender = args->impl;
+ sender = (sandesha2_sender_t*)sender;
- storage_mgr = sandesha2_utils_get_storage_mgr(env,
- sender_impl->conf_ctx,
- AXIS2_CONF_CTX_GET_CONF(sender_impl->conf_ctx, env));
+ storage_mgr = sandesha2_utils_get_storage_mgr(env, sender->conf_ctx,
+ AXIS2_CONF_CTX_GET_CONF(sender->conf_ctx, env));
- while(AXIS2_TRUE == sender_impl->run_sender)
+ while(sender->run_sender)
{
sandesha2_transaction_t *transaction = NULL;
/* Use when transaction handling is done
axis2_bool_t rollbacked = AXIS2_FALSE;*/
sandesha2_sender_mgr_t *mgr = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
- sandesha2_sender_bean_t *bean1 = NULL;
- sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
- axis2_char_t *key = NULL;
- axis2_msg_ctx_t *msg_ctx = NULL;
- axis2_property_t *property = NULL;
- axis2_bool_t continue_sending = AXIS2_TRUE;
- axis2_char_t *qualified_for_sending = NULL;
- sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
- sandesha2_property_bean_t *prop_bean = NULL;
- axis2_array_list_t *msgs_not_to_send = NULL;
- int msg_type = -1;
- axis2_transport_out_desc_t *transport_out = NULL;
- axis2_transport_sender_t *transport_sender = NULL;
- axis2_bool_t successfully_sent = AXIS2_FALSE;
+ sandesha2_sender_worker_t *sender_worker = NULL;
axis2_char_t *msg_id = NULL;
- AXIS2_SLEEP(1);
+ AXIS2_SLEEP(SENDER_SLEEP_TIME);
transaction = sandesha2_storage_mgr_get_transaction(storage_mgr,
env);
mgr = sandesha2_storage_mgr_get_retrans_mgr(storage_mgr, env);
@@ -501,189 +268,18 @@
{
continue;
}
-
- key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
- msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key,
- sender_impl->conf_ctx);
- if(!msg_ctx)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is "
- "not present in the store");
- break;
- }
- property = axis2_property_create(env);
- AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
- AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
- SANDESHA2_VALUE_TRUE, env));
- AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
- property, AXIS2_FALSE);
- continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
- sender_bean, sender_impl->conf_ctx, storage_mgr);
- if(!continue_sending)
- {
- continue;
- }
-
- property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env,
- SANDESHA2_QUALIFIED_FOR_SENDING, AXIS2_FALSE);
- if(property)
- qualified_for_sending = AXIS2_PROPERTY_GET_VALUE(property, env);
-
- if(qualified_for_sending && 0 != AXIS2_STRCMP(
- qualified_for_sending, SANDESHA2_VALUE_TRUE))
- continue;
- rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
-
- prop_bean = sandesha2_utils_get_property_bean_from_op(env,
- AXIS2_MSG_CTX_GET_OP(msg_ctx, env));
- if(prop_bean)
- msgs_not_to_send = sandesha2_property_bean_get_msg_types_to_drop(
- prop_bean, env);
- if(msgs_not_to_send)
- {
- int j = 0;
- axis2_bool_t continue_sending = AXIS2_FALSE;
-
- for(j = 0; j < AXIS2_ARRAY_LIST_SIZE(msgs_not_to_send, env); j++)
- {
- axis2_char_t *value = NULL;
- int int_val = -1;
- int msg_type = -1;
-
- value = AXIS2_ARRAY_LIST_GET(msgs_not_to_send, env, j);
- int_val = atoi(value);
- msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
- if(msg_type == int_val)
- continue_sending = AXIS2_TRUE;
- }
- if(continue_sending)
- continue;
- }
- /*
- * This method is not implemented yet
- * update_msg(sender, env, msg_xtx);
- */
- msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
- if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION)
- {
- sandesha2_seq_t *seq = NULL;
- axis2_char_t *seq_id = NULL;
- sandesha2_identifier_t *identifier = NULL;
-
- seq = (sandesha2_seq_t*)
- sandesha2_msg_ctx_get_msg_part(rm_msg_ctx,
- env, SANDESHA2_MSG_PART_SEQ);
- identifier = sandesha2_seq_get_identifier(seq, env);
- seq_id = sandesha2_identifier_get_identifier(identifier, env);
- }
-
- if(sandesha2_sender_is_piggybackable_msg_type(sender, env,
- msg_type) && AXIS2_FALSE ==
- sandesha2_sender_is_ack_already_piggybacked(sender, env,
- rm_msg_ctx))
- {
- sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx,
- storage_mgr);
- }
-
-
- transport_out = AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx, env);
- transport_sender = AXIS2_TRANSPORT_OUT_DESC_GET_SENDER(transport_out,
- env);
- if(transport_sender)
- {
- SANDESHA2_TRANSACTION_COMMIT(transaction, env);
- property = axis2_property_create(env);
- AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
- AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
- SANDESHA2_VALUE_FALSE, env));
- AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
- SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
- /* Consider building soap envelope */
- AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx);
- successfully_sent = AXIS2_TRUE;
- sender_impl->counter++;
- /*printf("**********************counter******************:%d\n", sender_impl->counter);
- if(2 == sender_impl->counter)
- sleep(300000);*/
-
- }
- transaction = sandesha2_storage_mgr_get_transaction(storage_mgr,
- env);
- property = axis2_property_create(env);
- AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
- AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
- SANDESHA2_VALUE_TRUE, env));
- AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
- SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
msg_id = sandesha2_sender_bean_get_msg_id(sender_bean, env);
- bean1 = sandesha2_sender_mgr_retrieve(mgr, env, msg_id);
- if(bean1)
+ if(msg_id)
{
- axis2_bool_t resend = AXIS2_FALSE;
-
- resend = sandesha2_sender_bean_is_resend(sender_bean, env);
- if(AXIS2_TRUE == resend)
- {
- sandesha2_sender_bean_set_sent_count(bean1, env,
- sandesha2_sender_bean_get_sent_count(sender_bean, env));
- sandesha2_sender_bean_set_time_to_send(bean1, env,
- sandesha2_sender_bean_get_time_to_send(sender_bean, env));
- sandesha2_sender_mgr_update(mgr, env, bean1);
- }
- else
- {
- axis2_char_t *msg_stored_key = NULL;
-
- msg_id = sandesha2_sender_bean_get_msg_id(bean1, env);
- sandesha2_sender_mgr_remove(mgr, env, msg_id);
- /* Removing the message from the storage */
- msg_stored_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
- bean1, env);
- sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,
- msg_stored_key);
- }
- }
- if(successfully_sent)
- {
- if(AXIS2_FALSE == AXIS2_MSG_CTX_GET_SERVER_SIDE(msg_ctx, env))
- sandesha2_sender_check_for_sync_res(sender, env, msg_ctx);
- }
- if(SANDESHA2_MSG_TYPE_TERMINATE_SEQ == sandesha2_msg_ctx_get_msg_type(
- rm_msg_ctx, env))
- {
- sandesha2_terminate_seq_t *terminate_seq = NULL;
- axis2_char_t *seq_id = NULL;
- axis2_conf_ctx_t *conf_ctx = NULL;
- axis2_char_t *int_seq_id = NULL;
-
- terminate_seq = (sandesha2_terminate_seq_t*)
- sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env,
- SANDESHA2_MSG_PART_TERMINATE_SEQ);
- seq_id = sandesha2_identifier_get_identifier(
- sandesha2_terminate_seq_get_identifier(terminate_seq,
- env), env);
- conf_ctx = AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx, env);
- int_seq_id = sandesha2_utils_get_seq_property(env, seq_id,
- SANDESHA2_SEQ_PROP_INTERNAL_SEQ_ID, storage_mgr);
- sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx,
- int_seq_id, AXIS2_MSG_CTX_GET_SERVER_SIDE(msg_ctx, env),
- storage_mgr);
- }
- property = axis2_property_create(env);
- AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
- AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
- SANDESHA2_VALUE_FALSE, env));
- AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
- SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
- /* TODO make transaction handling effective */
- if(transaction)
- {
- SANDESHA2_TRANSACTION_COMMIT(transaction, env);
+ /* Start a sender worker which will work on this message */
+ sender_worker = sandesha2_sender_worker_create(env, sender->conf_ctx,
+ msg_id);
+ sandesha2_sender_worker_run(sender_worker, env);
}
}
- axis2_env_free(env);
-
+ #ifdef AXIS2_SVR_MULTI_THREADED
+ AXIS2_THREAD_POOL_EXIT_THREAD(env->thread_pool, thd);
+ #endif
return NULL;
}
-
+
Added: webservices/sandesha/trunk/c/src/workers/sender_worker.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender_worker.c?view=auto&rev=475678
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender_worker.c (added)
+++ webservices/sandesha/trunk/c/src/workers/sender_worker.c Thu Nov 16 02:53:28 2006
@@ -0,0 +1,556 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sandesha2_sender_worker.h>
+#include <sandesha2_ack_mgr.h>
+#include <sandesha2_constants.h>
+#include <sandesha2_utils.h>
+#include <sandesha2_transaction.h>
+#include <sandesha2_storage_mgr.h>
+#include <sandesha2_sender_mgr.h>
+#include <sandesha2_seq_property_bean.h>
+#include <sandesha2_seq_property_mgr.h>
+#include <sandesha2_msg_ctx.h>
+#include <sandesha2_seq.h>
+#include <axis2_addr.h>
+#include <axis2_engine.h>
+#include <stdlib.h>
+#include <axis2_http_transport.h>
+#include <axis2_http_transport_utils.h>
+#include <axiom_soap_const.h>
+#include <axiom_soap_fault.h>
+#include <axiom_soap_body.h>
+#include <sandesha2_msg_init.h>
+#include <sandesha2_terminate_seq.h>
+#include <sandesha2_terminate_mgr.h>
+#include <sandesha2_msg_retrans_adjuster.h>
+#include <platforms/axis2_platform_auto_sense.h>
+
+/**
+ * @brief Sender struct impl
+ * Sandesha2 Sender Invoker
+ */
+typedef struct sandesha2_sender_worker_args sandesha2_sender_worker_args_t;
+
+struct sandesha2_sender_worker_t
+{
+ axis2_conf_ctx_t *conf_ctx;
+ axis2_thread_mutex_t *mutex;
+ int counter;
+ axis2_char_t *msg_id;
+ axis2_transport_out_desc_t *transport_out;
+};
+
+struct sandesha2_sender_worker_args
+{
+ sandesha2_sender_worker_t *impl;
+ axis2_env_t *env;
+};
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_run (
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_sender_worker_worker_func(
+ axis2_thread_t *thd,
+ void *data);
+
+static axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_piggybackable_msg_type(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ int msg_type);
+
+static axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_ack_already_piggybacked(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ sandesha2_msg_ctx_t *rm_msg_ctx);
+
+static axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_check_for_sync_res(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axis2_msg_ctx_t *msg_ctx);
+
+axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_fault_envelope(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axiom_soap_envelope_t *soap_envelope);
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_free(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env);
+
+
+AXIS2_EXTERN sandesha2_sender_worker_t* AXIS2_CALL
+sandesha2_sender_worker_create(
+ const axis2_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *msg_id)
+{
+ sandesha2_sender_worker_t *sender_worker = NULL;
+ AXIS2_ENV_CHECK(env, NULL);
+
+ sender_worker = (sandesha2_sender_worker_t *)AXIS2_MALLOC
+ (env->allocator,
+ sizeof(sandesha2_sender_worker_t));
+
+ if(!sender_worker)
+ {
+ AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
+ return NULL;
+ }
+ sender_worker->conf_ctx = conf_ctx;
+ sender_worker->mutex = NULL;
+ sender_worker->counter = 0;
+ sender_worker->msg_id = AXIS2_STRDUP(msg_id, env);
+ sender_worker->transport_out = NULL;
+
+ sender_worker->mutex = axis2_thread_mutex_create(env->allocator,
+ AXIS2_THREAD_MUTEX_DEFAULT);
+
+ return sender_worker;
+}
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_free_void_arg(
+ void *sender_worker,
+ const axis2_env_t *env)
+{
+ sandesha2_sender_worker_t *sender_worker_l = NULL;
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ sender_worker_l = (sandesha2_sender_worker_t *) sender_worker;
+ return sandesha2_sender_worker_free(sender_worker_l, env);
+}
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_free(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env)
+{
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ /* Do not free this */
+ sender_worker->conf_ctx = NULL;
+
+ if(sender_worker->mutex)
+ {
+ axis2_thread_mutex_destroy(sender_worker->mutex);
+ sender_worker->mutex = NULL;
+ }
+ if(sender_worker->msg_id)
+ {
+ AXIS2_FREE(env->allocator, sender_worker->msg_id);
+ sender_worker->msg_id = NULL;
+ }
+ AXIS2_FREE(env->allocator, sender_worker);
+ return AXIS2_SUCCESS;
+}
+
+axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_run (
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env)
+{
+ axis2_thread_t *worker_thread = NULL;
+ sandesha2_sender_worker_args_t *args = NULL;
+
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_sender_worker_args_t));
+ args->impl = sender_worker;
+ args->env = (axis2_env_t*)env;
+
+ worker_thread = AXIS2_THREAD_POOL_GET_THREAD(env->thread_pool,
+ sandesha2_sender_worker_worker_func, (void*)args);
+ if(!worker_thread)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2]Thread creation "
+ "failed sandesha2_sender_worker_run");
+ return AXIS2_FAILURE;
+ }
+ AXIS2_THREAD_POOL_THREAD_DETACH(env->thread_pool, worker_thread);
+ printf("came1\n");
+ return AXIS2_SUCCESS;
+}
+
+/**
+ * Thread worker function.
+ */
+static void * AXIS2_THREAD_FUNC
+sandesha2_sender_worker_worker_func(
+ axis2_thread_t *thd,
+ void *data)
+{
+ sandesha2_sender_worker_t *sender_worker = NULL;
+ sandesha2_sender_worker_args_t *args;
+ axis2_env_t *env = NULL;
+ sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_transaction_t *transaction = NULL;
+ sandesha2_sender_bean_t *sender_worker_bean = NULL;
+ sandesha2_sender_bean_t *bean1 = NULL;
+ sandesha2_sender_mgr_t *sender_mgr = NULL;
+ axis2_char_t *key = NULL;
+ axis2_msg_ctx_t *msg_ctx = NULL;
+ axis2_property_t *property = NULL;
+ axis2_bool_t continue_sending = AXIS2_TRUE;
+ axis2_char_t *qualified_for_sending = NULL;
+ sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
+ sandesha2_property_bean_t *prop_bean = NULL;
+ axis2_array_list_t *msgs_not_to_send = NULL;
+ int msg_type = -1;
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
+ axis2_bool_t successfully_sent = AXIS2_FALSE;
+ const axis2_char_t *msg_id = NULL;
+
+ args = (sandesha2_sender_worker_args_t*)data;
+ env = axis2_init_thread_env(args->env);
+ sender_worker = args->impl;
+ msg_id = sender_worker->msg_id;
+ transport_out = sender_worker->transport_out;
+
+ storage_mgr = sandesha2_utils_get_storage_mgr(env,
+ sender_worker->conf_ctx,
+ AXIS2_CONF_CTX_GET_CONF(sender_worker->conf_ctx, env));
+
+ transaction = sandesha2_storage_mgr_get_transaction(storage_mgr,
+ env);
+ sender_mgr = sandesha2_storage_mgr_get_retrans_mgr(storage_mgr, env);
+ sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+ if(!sender_worker_bean)
+ {
+ return NULL;
+ }
+
+ key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_worker_bean, env);
+ msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key,
+ sender_worker->conf_ctx);
+ if(!msg_ctx)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is "
+ "not present in the store");
+ return NULL;
+ }
+ property = axis2_property_create(env);
+ AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
+ AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
+ SANDESHA2_VALUE_TRUE, env));
+ AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
+ property, AXIS2_FALSE);
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
+ sender_worker_bean, sender_worker->conf_ctx, storage_mgr);
+ if(!continue_sending)
+ {
+ return NULL;
+ }
+
+ property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env,
+ SANDESHA2_QUALIFIED_FOR_SENDING, AXIS2_FALSE);
+ if(property)
+ qualified_for_sending = AXIS2_PROPERTY_GET_VALUE(property, env);
+
+ if(qualified_for_sending && 0 != AXIS2_STRCMP(
+ qualified_for_sending, SANDESHA2_VALUE_TRUE))
+ return NULL;
+ rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
+
+ prop_bean = sandesha2_utils_get_property_bean_from_op(env,
+ AXIS2_MSG_CTX_GET_OP(msg_ctx, env));
+ if(prop_bean)
+ msgs_not_to_send = sandesha2_property_bean_get_msg_types_to_drop(
+ prop_bean, env);
+ if(msgs_not_to_send)
+ {
+ int j = 0;
+ axis2_bool_t continue_sending = AXIS2_FALSE;
+
+ for(j = 0; j < AXIS2_ARRAY_LIST_SIZE(msgs_not_to_send, env); j++)
+ {
+ axis2_char_t *value = NULL;
+ int int_val = -1;
+ int msg_type = -1;
+
+ value = AXIS2_ARRAY_LIST_GET(msgs_not_to_send, env, j);
+ int_val = atoi(value);
+ msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+ if(msg_type == int_val)
+ continue_sending = AXIS2_TRUE;
+ }
+ if(continue_sending)
+ return NULL;
+ }
+ /*
+ * This method is not implemented yet
+ * update_msg(sender_worker, env, msg_xtx);
+ */
+ msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+ if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION)
+ {
+ sandesha2_seq_t *seq = NULL;
+ axis2_char_t *seq_id = NULL;
+ sandesha2_identifier_t *identifier = NULL;
+
+ seq = (sandesha2_seq_t*)
+ sandesha2_msg_ctx_get_msg_part(rm_msg_ctx,
+ env, SANDESHA2_MSG_PART_SEQ);
+ identifier = sandesha2_seq_get_identifier(seq, env);
+ seq_id = sandesha2_identifier_get_identifier(identifier, env);
+ }
+ if(sandesha2_sender_worker_is_piggybackable_msg_type(sender_worker, env,
+ msg_type) && AXIS2_FALSE ==
+ sandesha2_sender_worker_is_ack_already_piggybacked(sender_worker, env,
+ rm_msg_ctx))
+ {
+ sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx,
+ storage_mgr);
+ }
+
+
+ transport_out = AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx, env);
+ transport_sender = AXIS2_TRANSPORT_OUT_DESC_GET_SENDER(transport_out, env);
+ if(transport_sender)
+ {
+ SANDESHA2_TRANSACTION_COMMIT(transaction, env);
+ property = axis2_property_create(env);
+ AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
+ AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
+ SANDESHA2_VALUE_FALSE, env));
+ AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
+ /* Consider building soap envelope */
+ AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx);
+ successfully_sent = AXIS2_TRUE;
+ sender_worker->counter++;
+ /*printf("**********************counter******************:%d\n", sender_worker->counter);
+ if(2 == sender_worker->counter)
+ sleep(300000);*/
+
+ }
+ transaction = sandesha2_storage_mgr_get_transaction(storage_mgr,
+ env);
+ property = axis2_property_create(env);
+ AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
+ AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
+ SANDESHA2_VALUE_TRUE, env));
+ AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
+ msg_id = sandesha2_sender_bean_get_msg_id(sender_worker_bean, env);
+ bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+ if(bean1)
+ {
+ axis2_bool_t resend = AXIS2_FALSE;
+
+ resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
+ if(resend)
+ {
+ sandesha2_sender_bean_set_sent_count(bean1, env,
+ sandesha2_sender_bean_get_sent_count(sender_worker_bean, env));
+ sandesha2_sender_bean_set_time_to_send(bean1, env,
+ sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
+ sandesha2_sender_mgr_update(sender_mgr, env, bean1);
+ }
+ else
+ {
+ axis2_char_t *msg_stored_key = NULL;
+
+ msg_id = sandesha2_sender_bean_get_msg_id(bean1, env);
+ sandesha2_sender_mgr_remove(sender_mgr, env, msg_id);
+ /* Removing the message from the storage */
+ msg_stored_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
+ bean1, env);
+ sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,
+ msg_stored_key);
+ }
+ }
+ if(successfully_sent)
+ {
+ if(AXIS2_FALSE == AXIS2_MSG_CTX_GET_SERVER_SIDE(msg_ctx, env))
+ sandesha2_sender_worker_check_for_sync_res(sender_worker, env, msg_ctx);
+ }
+ if(SANDESHA2_MSG_TYPE_TERMINATE_SEQ == sandesha2_msg_ctx_get_msg_type(
+ rm_msg_ctx, env))
+ {
+ sandesha2_terminate_seq_t *terminate_seq = NULL;
+ axis2_char_t *seq_id = NULL;
+ axis2_conf_ctx_t *conf_ctx = NULL;
+ axis2_char_t *int_seq_id = NULL;
+
+ terminate_seq = (sandesha2_terminate_seq_t*)
+ sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env,
+ SANDESHA2_MSG_PART_TERMINATE_SEQ);
+ seq_id = sandesha2_identifier_get_identifier(
+ sandesha2_terminate_seq_get_identifier(terminate_seq,
+ env), env);
+ conf_ctx = AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx, env);
+ int_seq_id = sandesha2_utils_get_seq_property(env, seq_id,
+ SANDESHA2_SEQ_PROP_INTERNAL_SEQ_ID, storage_mgr);
+ sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx,
+ int_seq_id, AXIS2_MSG_CTX_GET_SERVER_SIDE(msg_ctx, env),
+ storage_mgr);
+ }
+ property = axis2_property_create(env);
+ AXIS2_PROPERTY_SET_SCOPE(property, env, AXIS2_SCOPE_REQUEST);
+ AXIS2_PROPERTY_SET_VALUE(property, env, AXIS2_STRDUP(
+ SANDESHA2_VALUE_FALSE, env));
+ AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
+ /* TODO make transaction handling effective */
+ if(transaction)
+ {
+ SANDESHA2_TRANSACTION_COMMIT(transaction, env);
+ }
+ #ifdef AXIS2_SVR_MULTI_THREADED
+ AXIS2_THREAD_POOL_EXIT_THREAD(env->thread_pool, thd);
+ #endif
+ return NULL;
+}
+
+static axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_piggybackable_msg_type(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ int msg_type)
+{
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ if(SANDESHA2_MSG_TYPE_ACK == msg_type)
+ return AXIS2_FALSE;
+
+ return AXIS2_TRUE;
+}
+
+static axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_ack_already_piggybacked(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ sandesha2_msg_ctx_t *rm_msg_ctx)
+{
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
+
+ if(sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env,
+ SANDESHA2_MSG_PART_SEQ_ACKNOWLEDGEMENT))
+ return AXIS2_TRUE;
+
+ return AXIS2_FALSE;
+}
+
+static axis2_status_t AXIS2_CALL
+sandesha2_sender_worker_check_for_sync_res(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axis2_msg_ctx_t *msg_ctx)
+{
+ axis2_property_t *property = NULL;
+ axis2_msg_ctx_t *res_msg_ctx = NULL;
+ axis2_op_ctx_t *req_op_ctx = NULL;
+ axiom_soap_envelope_t *res_envelope = NULL;
+ axis2_char_t *soap_ns_uri = NULL;
+
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
+ property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN,
+ AXIS2_FALSE);
+ if(!property)
+ return AXIS2_SUCCESS;
+
+ res_msg_ctx = axis2_msg_ctx_create(env, AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx,
+ env), AXIS2_MSG_CTX_GET_TRANSPORT_IN_DESC(
+ msg_ctx, env), AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx,
+ env));
+ AXIS2_MSG_CTX_SET_SERVER_SIDE(res_msg_ctx, env, AXIS2_FALSE);
+ AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env, AXIS2_TRANSPORT_IN,
+ AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN,
+ AXIS2_FALSE), AXIS2_FALSE);
+ AXIS2_MSG_CTX_SET_SVC_CTX(res_msg_ctx, env, AXIS2_MSG_CTX_GET_SVC_CTX(
+ msg_ctx, env));
+ AXIS2_MSG_CTX_SET_SVC_GRP_CTX(res_msg_ctx, env,
+ AXIS2_MSG_CTX_GET_SVC_GRP_CTX(msg_ctx, env));
+ req_op_ctx = AXIS2_MSG_CTX_GET_OP_CTX(msg_ctx, env);
+ if(req_op_ctx)
+ {
+ axis2_ctx_t *ctx = NULL;
+
+ ctx = AXIS2_OP_CTX_GET_BASE(req_op_ctx, env);
+ if(AXIS2_CTX_GET_PROPERTY(ctx, env, MTOM_RECIVED_CONTENT_TYPE,
+ AXIS2_FALSE))
+ {
+ AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env,
+ MTOM_RECIVED_CONTENT_TYPE, AXIS2_CTX_GET_PROPERTY(ctx, env,
+ MTOM_RECIVED_CONTENT_TYPE, AXIS2_FALSE), AXIS2_FALSE);
+ }
+ if(AXIS2_CTX_GET_PROPERTY(ctx, env, AXIS2_HTTP_CHAR_SET_ENCODING,
+ AXIS2_FALSE))
+ {
+ AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env,
+ AXIS2_HTTP_CHAR_SET_ENCODING, AXIS2_CTX_GET_PROPERTY(ctx, env,
+ AXIS2_HTTP_CHAR_SET_ENCODING, AXIS2_FALSE), AXIS2_FALSE);
+ }
+ }
+ AXIS2_MSG_CTX_SET_DOING_REST(res_msg_ctx, env, AXIS2_MSG_CTX_GET_DOING_REST(
+ msg_ctx, env));
+ soap_ns_uri = AXIS2_MSG_CTX_GET_IS_SOAP_11(msg_ctx, env) ?
+ AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
+ AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
+
+ res_envelope = axis2_http_transport_utils_create_soap_msg(env, msg_ctx,
+ soap_ns_uri);
+
+ AXIS2_MSG_CTX_SET_PROPERTY(res_msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
+ AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env,
+ SANDESHA2_WITHIN_TRANSACTION, AXIS2_FALSE), AXIS2_FALSE);
+ if(res_envelope)
+ {
+ axis2_engine_t *engine = NULL;
+ AXIS2_MSG_CTX_SET_SOAP_ENVELOPE(res_msg_ctx, env, res_envelope);
+
+ engine = axis2_engine_create(env, AXIS2_MSG_CTX_GET_CONF_CTX(msg_ctx,
+ env));
+ if(AXIS2_TRUE == sandesha2_sender_worker_is_fault_envelope(sender_worker, env,
+ res_envelope))
+ AXIS2_ENGINE_RECEIVE_FAULT(engine, env, res_msg_ctx);
+ else
+ AXIS2_ENGINE_RECEIVE(engine, env, res_msg_ctx);
+ }
+ return AXIS2_SUCCESS;
+}
+
+axis2_bool_t AXIS2_CALL
+sandesha2_sender_worker_is_fault_envelope(
+ sandesha2_sender_worker_t *sender_worker,
+ const axis2_env_t *env,
+ axiom_soap_envelope_t *soap_envelope)
+{
+ axiom_soap_fault_t *fault = NULL;
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ AXIS2_PARAM_CHECK(env->error, soap_envelope, AXIS2_FAILURE);
+
+ fault = AXIOM_SOAP_BODY_GET_FAULT(AXIOM_SOAP_ENVELOPE_GET_BODY(soap_envelope,
+ env), env);
+ if(fault)
+ return AXIS2_TRUE;
+
+ return AXIS2_FALSE;
+}
+
+
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org