You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by sh...@apache.org on 2008/11/27 07:24:07 UTC
svn commit: r721090 - in /webservices/axis2/trunk/c/src/core/transport/amqp:
receiver/ receiver/qpid_receiver/request_processor/ sender/
sender/qpid_sender/ server/axis2_amqp_server/ util/
Author: shankar
Date: Wed Nov 26 22:24:07 2008
New Revision: 721090
URL: http://svn.apache.org/viewvc?rev=721090&view=rev
Log:
applying patch given in AXIS2C-1298
Modified:
webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c
webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c
webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c
webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp
webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h
webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp
webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h
webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c
webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h
webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c
webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c Wed Nov 26 22:24:07 2008
@@ -70,7 +70,7 @@
axutil_property_t* property = NULL;
const axis2_char_t* broker_ip = NULL;
int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
- *broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
+ *broker_port = AXIS2_QPID_NULL_CONF_INT;
receiver_resource_pack->conf_ctx_private = axis2_build_conf_ctx(env, repo);
if (!receiver_resource_pack->conf_ctx_private)
@@ -86,7 +86,7 @@
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
/* Set broker port */
- *broker_port = (qpid_broker_port != AXIS2_QPID_NULL_BROKER_PORT) ?
+ *broker_port = (qpid_broker_port != AXIS2_QPID_NULL_CONF_INT) ?
qpid_broker_port : AXIS2_QPID_DEFAULT_BROKER_PORT;
property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
@@ -110,7 +110,7 @@
axutil_property_t* property = NULL;
const axis2_char_t* broker_ip = NULL;
int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
- *broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
+ *broker_port = AXIS2_QPID_NULL_CONF_INT;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
@@ -118,22 +118,26 @@
receiver_resource_pack->conf_ctx = conf_ctx;
/* Set broker IP */
- broker_ip = axis2_amqp_util_get_conf_value_string(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_IP);
+ broker_ip = axis2_amqp_util_get_in_desc_conf_value_string(
+ in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_IP);
if (!broker_ip)
{
broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
}
- property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip);
+ property = axutil_property_create_with_args(
+ env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
/* Set broker port */
- *broker_port = axis2_amqp_util_get_conf_value_int(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_PORT);
- if (!broker_port)
+ *broker_port = axis2_amqp_util_get_in_desc_conf_value_int(
+ in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_PORT);
+ if (*broker_port == AXIS2_QPID_NULL_CONF_INT)
{
*broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
}
- property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
+ property = axutil_property_create_with_args(
+ env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);
@@ -250,7 +254,7 @@
{
int status = AXIS2_SUCCESS;
- *inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_BROKER_PORT);
+ *inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_CONF_INT);
if (!(*inst))
{
status = AXIS2_FAILURE;
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c Wed Nov 26 22:24:07 2008
@@ -99,7 +99,6 @@
axis2_status_t status = AXIS2_FAILURE;
axutil_hash_t *binary_data_map = NULL;
axiom_soap_body_t *soap_body = NULL;
- axis2_endpoint_ref_t* reply_to = NULL;
axutil_property_t* reply_to_property = NULL;
/* Create msg_ctx */
@@ -307,13 +306,10 @@
/* SOAP version */
axis2_msg_ctx_set_is_soap_11(msg_ctx, env, is_soap_11);
- /* Set ReplyTo in the msg_ctx */
- reply_to = axis2_endpoint_ref_create(env, request_resource_pack->reply_to);
- axis2_msg_ctx_set_reply_to(msg_ctx, env, reply_to);
-
- /* Set ReplyTo in the msg_ctx as a property. This is useful when
- * server replies in the dual-channel case */
- reply_to_property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST, 0, 0,
+ /* Set ReplyTo in the msg_ctx as a property. This is used by the server when
+ * 1. WS-A is not in use
+ * 2. ReplyTo is an anonymous EPR - Sandesha2/Dual-channel */
+ reply_to_property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST, 0, 0,
(void*)request_resource_pack->reply_to);
axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO,
reply_to_property);
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c Wed Nov 26 22:24:07 2008
@@ -60,10 +60,27 @@
axis2_transport_out_desc_t* out_desc)
{
axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
- sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+ axutil_property_t* property = NULL;
+ int* request_timeout = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
+ *request_timeout = AXIS2_QPID_NULL_CONF_INT;
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
sender_resource_pack->conf_ctx = conf_ctx;
+ /* Set request timeout */
+ *request_timeout = axis2_amqp_util_get_out_desc_conf_value_int(
+ out_desc, env, AXIS2_AMQP_CONF_QPID_REQUEST_TIMEOUT);
+ if (*request_timeout == AXIS2_QPID_NULL_CONF_INT)
+ {
+ *request_timeout = AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT;
+ }
+ property = axutil_property_create_with_args(
+ env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)request_timeout);
+ axis2_conf_ctx_set_property(sender_resource_pack->conf_ctx, env,
+ AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT, property);
+
return AXIS2_SUCCESS;
}
@@ -81,7 +98,6 @@
axiom_xml_writer_t* xml_writer = NULL;
axiom_output_t* request_om_output = NULL;
axis2_char_t* request_content = NULL;
- axiom_soap_envelope_t* response_soap_envelope = NULL;
axis2_bool_t is_server = AXIS2_TRUE;
axis2_bool_t is_soap_11 = AXIS2_FALSE;
axutil_string_t* content_type = NULL;
@@ -171,71 +187,92 @@
return AXIS2_FAILURE;
}
- is_server = axis2_msg_ctx_get_server_side(msg_ctx, env);
+ is_server = axis2_amqp_util_msg_ctx_get_server_side(msg_ctx, env);
if (is_server)
{
- status = axis2_qpid_server_send(request_content, env,
+ status = axis2_qpid_send(request_content, env,
axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
}
else
{
- axis2_op_t* op = NULL;
- const axis2_char_t* mep = NULL;
-
- op = axis2_msg_ctx_get_op(msg_ctx, env);
- mep = axis2_op_get_msg_exchange_pattern(op, env);
-
- if (axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) == 0 ||
- axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0) /* One-way */
+ if (AXIS2_TRUE == axis2_amqp_util_msg_ctx_get_use_separate_listener(
+ msg_ctx, env)) /* Dual Channel */
{
- status = axis2_qpid_client_send_robust(request_content, env,
+ status = axis2_qpid_send(request_content, env,
axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
-
- /* Set Status Code in msg_ctx */
- axis2_msg_ctx_set_status_code(msg_ctx, env, status);
}
else
{
- if (AXIS2_TRUE == axis2_amqp_util_msg_ctx_get_use_separate_listener(msg_ctx, env)) /* Dual Channel */
+ axis2_op_t* op = NULL;
+ const axis2_char_t* mep = NULL;
+
+ op = axis2_msg_ctx_get_op(msg_ctx, env);
+
+ if (op)
+ {
+ mep = axis2_op_get_msg_exchange_pattern(op, env);
+ }
+
+ axis2_amqp_response_t* response = NULL;
+ response = axis2_qpid_send_receive(request_content, env,
+ axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+
+ if (response)
{
- axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
- sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+ /* Create in stream */
+ if (response->data)
+ {
+ axutil_stream_t* in_stream = NULL;
+ axutil_property_t* property = NULL;
+
+ in_stream = axutil_stream_create_basic(env);
+ axutil_stream_write(in_stream, env, response->data,
+ response->length);
+
+ property = axutil_property_create(env);
+ axutil_property_set_scope(property, env, AXIS2_SCOPE_REQUEST);
+ axutil_property_set_free_func(property, env, axutil_stream_free_void_arg);
+ axutil_property_set_value(property, env, in_stream);
- if (!sender_resource_pack->conf_ctx)
- return AXIS2_FAILURE;
+ axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, property);
+ }
+
+ if (mep)
+ {
+ if (0 == axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN)) /* Out-In */
+ {
+ axiom_soap_envelope_t* response_soap_envelope = NULL;
- axis2_char_t* reply_to_queue_name = NULL;
- reply_to_queue_name = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(
- sender_resource_pack->conf_ctx, env);
-
- if (!reply_to_queue_name)
- return AXIS2_FAILURE;
+ response_soap_envelope = axis2_amqp_util_get_soap_envelope(response, env, msg_ctx);
+ if (response_soap_envelope)
+ {
+ axis2_msg_ctx_set_response_soap_envelope(msg_ctx, env, response_soap_envelope);
+ }
+ }
+ }
+
+ status = AXIS2_SUCCESS;
- status = axis2_qpid_client_send_dual(request_content, env, reply_to_queue_name,
- axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+ axis2_msg_ctx_set_status_code(msg_ctx, env, status);
+
+ axis2_amqp_response_free(response, env);
}
- else /* Single Channel */
+ else
{
- axis2_amqp_binary_data_buffer_t* binary_data_buffer = NULL;
- binary_data_buffer = axis2_qpid_client_send_receive(request_content, env,
- axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
-
- if (binary_data_buffer)
+ if (mep)
{
- response_soap_envelope = axis2_amqp_util_get_soap_envelope(binary_data_buffer, env, msg_ctx);
-
- if (response_soap_envelope)
- axis2_msg_ctx_set_response_soap_envelope(msg_ctx, env, response_soap_envelope);
-
- AXIS2_FREE(env->allocator, binary_data_buffer->data);
- AXIS2_FREE(env->allocator, binary_data_buffer->content_type);
- AXIS2_FREE(env->allocator, binary_data_buffer);
-
- status = AXIS2_SUCCESS;
+ if (axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) == 0 ||
+ axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0) /* One-way */
+ {
+ status = AXIS2_SUCCESS;
+
+ /* Set status code in msg_ctx */
+ axis2_msg_ctx_set_status_code(msg_ctx, env, status);
+ }
}
}
- }
+ }
}
if (content_type)
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp Wed Nov 26 22:24:07 2008
@@ -15,12 +15,19 @@
* limitations under the License.
*/
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/sys/Time.h>
#include <axis2_amqp_defines.h>
#include <axiom_mime_part.h>
-#include <fstream>
#include <axis2_qpid_sender.h>
+#include <fstream>
using namespace std;
+using namespace qpid::client;
+using namespace qpid::framing;
Axis2QpidSender::Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env)
{
@@ -29,19 +36,15 @@
this->env = env;
this->responseContent = "";
this->responseContentType = "";
- this->subscriptionManager = NULL;
}
Axis2QpidSender::~Axis2QpidSender(void)
-{
- if (subscriptionManager)
- delete subscriptionManager;
-}
+{}
-bool Axis2QpidSender::ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts)
+bool Axis2QpidSender::SendReceive(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts, int timeout)
{
bool status = false;
this->responseContent = "";
@@ -64,13 +67,13 @@
arg::bindingKey = replyToQueueName);
/* Create Message */
- Message message;
+ Message reqMessage;
- message.getDeliveryProperties().setRoutingKey(toQueueName);
- message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
+ reqMessage.getDeliveryProperties().setRoutingKey(toQueueName);
+ reqMessage.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
- message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
- message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+ reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+ reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
if (mime_parts)
{
@@ -82,77 +85,25 @@
messageContent.append(mimeBody);
}
- message.setData(messageContent);
+ reqMessage.setData(messageContent);
- async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-
- /* Create Subscription manager */
- subscriptionManager = new SubscriptionManager(session);
-
- subscriptionManager->subscribe(*this, replyToQueueName);
- subscriptionManager->run();
-
- /* Current thread gets bloked here until the response hits the message listener */
-
- connection.close();
-
- status = true;
- }
- catch (const std::exception& e)
- {
- }
-
- return status;
-}
-
-
-bool Axis2QpidSender::ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts)
-{
- bool status = false;
-
- try
- {
- Connection connection;
- connection.open(qpidBrokerIP, qpidBrokerPort);
+ async(session).messageTransfer(arg::content = reqMessage, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
- Session session = connection.newSession();
-
- /* Declare Private Queue */
- string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX;
- replyToQueueName.append(axutil_uuid_gen(env));
-
- session.queueDeclare(arg::queue = replyToQueueName);
- session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
- arg::queue = replyToQueueName,
- arg::bindingKey = replyToQueueName);
-
- /* Create Message */
- Message message;
-
- message.getDeliveryProperties().setRoutingKey(toQueueName);
- message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
+ /* Create subscription manager */
+ SubscriptionManager subscriptionManager(session);
- message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
- message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+ Message resMessage;
+ qpid::sys::Duration reqTimeout(timeout * AXIS2_AMQP_NANOSEC_PER_MILLISEC);
- if (mime_parts)
+ if (subscriptionManager.get(resMessage, replyToQueueName, reqTimeout))
{
- string mimeBody;
- GetMimeBody(mime_parts, mimeBody);
+ responseContent = resMessage.getData();
+ responseContentType = resMessage.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
- messageContent.clear();/* MIME parts include SOAP envelop */
-
- messageContent.append(mimeBody);
+ status = true;
}
- message.setData(messageContent);
-
- async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-
connection.close();
-
- status = true;
}
catch (const std::exception& e)
{
@@ -162,7 +113,7 @@
}
-bool Axis2QpidSender::ClientSendDual(string messageContent, string toQueueName, string replyToQueueName,
+bool Axis2QpidSender::Send(string messageContent, string toQueueName, string replyToQueueName,
bool isSOAP11, string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
bool status = false;
@@ -174,62 +125,20 @@
Session session = connection.newSession();
- session.queueDeclare(arg::queue = replyToQueueName);
- session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
- arg::queue = replyToQueueName,
- arg::bindingKey = replyToQueueName);
-
/* Create Message */
Message message;
message.getDeliveryProperties().setRoutingKey(toQueueName);
- message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
-
- message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
- message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
- if (mime_parts)
+ if (!replyToQueueName.empty()) /* Client dual-channel */
{
- string mimeBody;
- GetMimeBody(mime_parts, mimeBody);
-
- messageContent.clear();/* MIME parts include SOAP envelop */
+ message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
- messageContent.append(mimeBody);
+ session.queueDeclare(arg::queue = replyToQueueName);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
+ arg::queue = replyToQueueName,
+ arg::bindingKey = replyToQueueName);
}
-
- message.setData(messageContent);
-
- async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-
- connection.close();
-
- status = true;
- }
- catch (const std::exception& e)
- {
- }
-
- return status;
-}
-
-
-bool Axis2QpidSender::ServerSend(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts)
-{
- bool status = false;
-
- try
- {
- Connection connection;
- connection.open(qpidBrokerIP, qpidBrokerPort);
-
- Session session = connection.newSession();
-
- /* Create Message */
- Message message;
-
- message.getDeliveryProperties().setRoutingKey(toQueueName);
message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
@@ -259,15 +168,6 @@
return status;
}
-void Axis2QpidSender::received(Message& message)
-{
- responseContent = message.getData();
- responseContentType = message.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
-
- if (subscriptionManager)
- subscriptionManager->cancel(message.getDestination());
-}
-
void Axis2QpidSender::GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody)
{
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h Wed Nov 26 22:24:07 2008
@@ -18,51 +18,32 @@
#ifndef AXIS2_QPID_SENDER_H
#define AXIS2_QPID_SENDER_H
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/Message.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
#include <axis2_util.h>
-
#include <sstream>
#include <string>
-using namespace qpid::client;
-using namespace qpid::framing;
-using std::stringstream;
using std::string;
-class Axis2QpidSender : public MessageListener
+class Axis2QpidSender
{
public:
Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env);
~Axis2QpidSender(void);
- /* Client */
- bool ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts);
- bool ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts);
- bool ClientSendDual(string messageContent, string toQueueName, string replyToQueueName, bool isSOAP11,
+ bool SendReceive(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts, int timeout);
+ bool Send(string messageContent, string toQueueName, string replyToQueueName, bool isSOAP11,
string contentType, string soapAction, axutil_array_list_t* mime_parts);
- /* Server */
- bool ServerSend(string messageContent, string toQueueName, bool isSOAP11,
- string contentType, string soapAction, axutil_array_list_t* mime_parts);
-
string responseContent;
string responseContentType;
private:
- virtual void received(Message& message);
-
void GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody);
string qpidBrokerIP;
int qpidBrokerPort;
const axutil_env_t* env;
- SubscriptionManager* subscriptionManager;
};
#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp Wed Nov 26 22:24:07 2008
@@ -24,8 +24,8 @@
{
#endif
-AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
-axis2_qpid_client_send_receive(
+AXIS2_EXTERN axis2_amqp_response_t* AXIS2_CALL
+axis2_qpid_send_receive(
const axis2_char_t* request_content,
const axutil_env_t* env,
const axis2_char_t* content_type,
@@ -33,7 +33,6 @@
axis2_msg_ctx_t* msg_ctx)
{
axis2_amqp_destination_info_t* destination_info = NULL;
-
destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
if (!destination_info || !destination_info->broker_ip ||
@@ -44,45 +43,45 @@
axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
+ int timeout = axis2_amqp_util_msg_ctx_get_request_timeout(msg_ctx, env);
/* Get Response */
Axis2QpidSender qpid_sender(destination_info->broker_ip,
destination_info->broker_port, env);
- bool status = qpid_sender.ClientSendReceive(request_content, destination_info->queue_name,
- is_soap_11, content_type, soap_action, mime_parts);
+
+ bool status = qpid_sender.SendReceive(request_content, destination_info->queue_name,
+ is_soap_11, content_type, soap_action, mime_parts, timeout);
- AXIS2_FREE(env->allocator, destination_info->broker_ip);
- AXIS2_FREE(env->allocator, destination_info->queue_name);
- AXIS2_FREE(env->allocator, destination_info);
+ axis2_amqp_destination_info_free(destination_info, env);
if (!status)
{
return NULL;
}
- /* Create a Copy and Return */
- axis2_amqp_binary_data_buffer_t* binary_data_buffer = (axis2_amqp_binary_data_buffer_t*)
- AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_binary_data_buffer_t));
+ /* Create response */
+ axis2_amqp_response_t* response = (axis2_amqp_response_t*)AXIS2_MALLOC(
+ env->allocator, sizeof(axis2_amqp_response_t));
/* Data */
- binary_data_buffer->data = AXIS2_MALLOC(env->allocator, qpid_sender.responseContent.size());
- memcpy(binary_data_buffer->data, qpid_sender.responseContent.c_str(),
+ response->data = AXIS2_MALLOC(env->allocator, qpid_sender.responseContent.size());
+ memcpy(response->data, qpid_sender.responseContent.c_str(),
qpid_sender.responseContent.size());
/* Length */
- binary_data_buffer->length = qpid_sender.responseContent.size();
+ response->length = qpid_sender.responseContent.size();
/* ContentType */
- binary_data_buffer->content_type = (axis2_char_t*)
- AXIS2_MALLOC(env->allocator, qpid_sender.responseContentType.size() + 1);
- strcpy(binary_data_buffer->content_type, qpid_sender.responseContentType.c_str());
+ response->content_type = (axis2_char_t*)AXIS2_MALLOC(
+ env->allocator, qpid_sender.responseContentType.size() + 1);
+ strcpy(response->content_type, qpid_sender.responseContentType.c_str());
- return binary_data_buffer;
+ return response;
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_robust(
+axis2_qpid_send(
const axis2_char_t* request_content,
const axutil_env_t* env,
const axis2_char_t* content_type,
@@ -91,6 +90,7 @@
{
axis2_amqp_destination_info_t* destination_info = NULL;
axis2_status_t status = AXIS2_FAILURE;
+ string reply_to_queue_name = "";
destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
@@ -102,89 +102,25 @@
axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-
- Axis2QpidSender qpid_sender(destination_info->broker_ip,
- destination_info->broker_port, env);
-
- status = qpid_sender.ClientSendRobust(request_content, destination_info->queue_name,
- is_soap_11, content_type, soap_action, mime_parts);
- AXIS2_FREE(env->allocator, destination_info->broker_ip);
- AXIS2_FREE(env->allocator, destination_info->queue_name);
- AXIS2_FREE(env->allocator, destination_info);
-
- return status;
-}
-
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_dual(
- const axis2_char_t* request_content,
- const axutil_env_t* env,
- const axis2_char_t* reply_to_queue_name,
- const axis2_char_t* content_type,
- const axis2_char_t* soap_action,
- axis2_msg_ctx_t* msg_ctx)
-{
- axis2_amqp_destination_info_t* destination_info = NULL;
- axis2_status_t status = AXIS2_FAILURE;
-
- destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
-
- if (!destination_info || !destination_info->broker_ip ||
- !destination_info->broker_port || !destination_info->queue_name)
+ /* If client side, find reply_to_queue_name */
+ if (!axis2_msg_ctx_get_server_side(msg_ctx, env))
{
- return AXIS2_FAILURE;
- }
-
- axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
- axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-
- Axis2QpidSender qpid_sender(destination_info->broker_ip,
- destination_info->broker_port, env);
-
- status = qpid_sender.ClientSendDual(request_content, destination_info->queue_name,
- reply_to_queue_name, is_soap_11, content_type, soap_action, mime_parts);
-
- AXIS2_FREE(env->allocator, destination_info->broker_ip);
- AXIS2_FREE(env->allocator, destination_info->queue_name);
- AXIS2_FREE(env->allocator, destination_info);
+ axis2_conf_ctx_t* conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
- return status;
-}
-
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_server_send(
- const axis2_char_t* request_content,
- const axutil_env_t* env,
- const axis2_char_t* content_type,
- const axis2_char_t* soap_action,
- axis2_msg_ctx_t* msg_ctx)
-{
- axis2_amqp_destination_info_t* destination_info = NULL;
- axis2_status_t status = AXIS2_FAILURE;
-
- destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
-
- if (!destination_info || !destination_info->broker_ip ||
- !destination_info->broker_port || !destination_info->queue_name)
- {
- return AXIS2_FAILURE;
+ axis2_char_t* queue_name =
+ axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx, env);
+ if (queue_name)
+ reply_to_queue_name = queue_name;
}
- axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
- axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-
Axis2QpidSender qpid_sender(destination_info->broker_ip,
destination_info->broker_port, env);
- status = qpid_sender.ServerSend(request_content, destination_info->queue_name,
- is_soap_11, content_type, soap_action, mime_parts);
+ status = qpid_sender.Send(request_content, destination_info->queue_name,
+ reply_to_queue_name, is_soap_11, content_type, soap_action, mime_parts);
- AXIS2_FREE(env->allocator, destination_info->broker_ip);
- AXIS2_FREE(env->allocator, destination_info->queue_name);
- AXIS2_FREE(env->allocator, destination_info);
+ axis2_amqp_destination_info_free(destination_info, env);
return status;
}
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h Wed Nov 26 22:24:07 2008
@@ -27,8 +27,8 @@
{
#endif
-AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
-axis2_qpid_client_send_receive(
+AXIS2_EXTERN axis2_amqp_response_t* AXIS2_CALL
+axis2_qpid_send_receive(
const axis2_char_t* request_content,
const axutil_env_t* env,
const axis2_char_t* content_type,
@@ -36,24 +36,7 @@
axis2_msg_ctx_t* msg_ctx);
AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_robust(
- const axis2_char_t* request_content,
- const axutil_env_t* env,
- const axis2_char_t* content_type,
- const axis2_char_t* soap_action,
- axis2_msg_ctx_t* msg_ctx);
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_dual(
- const axis2_char_t* request_content,
- const axutil_env_t* env,
- const axis2_char_t* reply_to_queue_name,
- const axis2_char_t* content_type,
- const axis2_char_t* soap_action,
- axis2_msg_ctx_t* msg_ctx);
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_server_send(
+axis2_qpid_send(
const axis2_char_t* request_content,
const axutil_env_t* env,
const axis2_char_t* content_type,
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c Wed Nov 26 22:24:07 2008
@@ -133,7 +133,7 @@
extern int optopt;
int c;
const axis2_char_t* qpid_broker_ip = NULL;
- int qpid_broker_port = AXIS2_QPID_NULL_BROKER_PORT;
+ int qpid_broker_port = AXIS2_QPID_NULL_CONF_INT;
const axis2_char_t* repo_path = AXIS2_AMQP_SERVER_REPO_PATH;
axutil_log_levels_t log_level = AXIS2_LOG_LEVEL_DEBUG;
const axis2_char_t* log_file_name = AXIS2_AMQP_SERVER_LOG_FILE_NAME;
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h Wed Nov 26 22:24:07 2008
@@ -24,13 +24,16 @@
#define AXIS2_AMQP_CONF_QPID_BROKER_IP "qpid_broker_ip"
#define AXIS2_AMQP_CONF_QPID_BROKER_PORT "qpid_broker_port"
+#define AXIS2_AMQP_CONF_QPID_REQUEST_TIMEOUT "request_timeout"
#define AXIS2_QPID_DEFAULT_BROKER_IP "127.0.0.1"
#define AXIS2_QPID_DEFAULT_BROKER_PORT 5672
-#define AXIS2_QPID_NULL_BROKER_PORT 0
+#define AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT 500
+#define AXIS2_QPID_NULL_CONF_INT -1
#define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP "qpid_broker_ip"
#define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT "qpid_broker_port"
+#define AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT "request_timeout"
#define AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME "queue_name"
#define AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO "qpid_reply_to"
@@ -51,10 +54,12 @@
#define AXIS2_AMQP_EPR_SERVICE_PREFIX "services"
#define AXIS2_AMQP_EPR_ANON_SERVICE_NAME "__ANONYMOUS_SERVICE__"
-#define AXIS2_AMQP_EQ '='
-#define AXIS2_AMQP_SEMI_COLON ';'
-#define AXIS2_AMQP_ESC_NULL '\0'
-#define AXIS2_AMQP_DOUBLE_QUOTE '"'
-#define AXIS2_AMQP_B_SLASH '\\'
+#define AXIS2_AMQP_EQ '='
+#define AXIS2_AMQP_SEMI_COLON ';'
+#define AXIS2_AMQP_ESC_NULL '\0'
+#define AXIS2_AMQP_DOUBLE_QUOTE '"'
+#define AXIS2_AMQP_B_SLASH '\\'
+
+#define AXIS2_AMQP_NANOSEC_PER_MILLISEC 1000*1000
#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c Wed Nov 26 22:24:07 2008
@@ -24,7 +24,7 @@
#include <axis2_amqp_util.h>
AXIS2_EXTERN axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_conf_value_string(
+axis2_amqp_util_get_in_desc_conf_value_string(
axis2_transport_in_desc_t* in_desc,
const axutil_env_t* env,
const axis2_char_t* param_name)
@@ -47,15 +47,15 @@
AXIS2_EXTERN int AXIS2_CALL
-axis2_amqp_util_get_conf_value_int(
+axis2_amqp_util_get_in_desc_conf_value_int(
axis2_transport_in_desc_t* in_desc,
const axutil_env_t* env,
const axis2_char_t* param_name)
{
axis2_char_t* value_str = NULL;
- int value = 0;
+ int value = AXIS2_QPID_NULL_CONF_INT;
- value_str = axis2_amqp_util_get_conf_value_string(
+ value_str = axis2_amqp_util_get_in_desc_conf_value_string(
in_desc, env, param_name);
if (value_str)
{
@@ -66,9 +66,52 @@
}
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+axis2_amqp_util_get_out_desc_conf_value_string(
+ axis2_transport_out_desc_t* out_desc,
+ const axutil_env_t* env,
+ const axis2_char_t* param_name)
+{
+ axutil_param_t* param = NULL;
+ axis2_char_t* value = NULL;
+
+ param = (axutil_param_t*)
+ axutil_param_container_get_param(
+ axis2_transport_out_desc_param_container(out_desc, env),
+ env,
+ param_name);
+ if (param)
+ {
+ value = axutil_param_get_value(param, env);
+ }
+
+ return value;
+}
+
+
+AXIS2_EXTERN int AXIS2_CALL
+axis2_amqp_util_get_out_desc_conf_value_int(
+ axis2_transport_out_desc_t* out_desc,
+ const axutil_env_t* env,
+ const axis2_char_t* param_name)
+{
+ axis2_char_t* value_str = NULL;
+ int value = AXIS2_QPID_NULL_CONF_INT;
+
+ value_str = axis2_amqp_util_get_out_desc_conf_value_string(
+ out_desc, env, param_name);
+ if (value_str)
+ {
+ value = atoi(value_str);
+ }
+
+ return value;
+}
+
+
AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
axis2_amqp_util_get_soap_envelope(
- axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+ axis2_amqp_response_t* response,
const axutil_env_t* env,
axis2_msg_ctx_t* msg_ctx)
{
@@ -83,7 +126,7 @@
axutil_hash_t *binary_data_map = NULL;
axis2_bool_t is_soap_11 = AXIS2_FALSE;
- if (!binary_data_buffer || !binary_data_buffer->data || !binary_data_buffer->content_type)
+ if (!response || !response->data || !response->content_type)
{
return NULL;
}
@@ -91,10 +134,10 @@
is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
/* Handle MTOM */
- if (strstr(binary_data_buffer->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
+ if (strstr(response->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
{
axis2_char_t* mime_boundary = axis2_amqp_util_get_value_from_content_type(env,
- binary_data_buffer->content_type,
+ response->content_type,
AXIS2_AMQP_HEADER_CONTENT_TYPE_MIME_BOUNDARY);
if (mime_boundary)
@@ -165,12 +208,12 @@
if (stream)
{
- axutil_stream_write(stream, env, binary_data_buffer->data,
- binary_data_buffer->length);
+ axutil_stream_write(stream, env, response->data,
+ response->length);
callback_ctx->env = env;
callback_ctx->in_stream = stream;
- callback_ctx->content_length = binary_data_buffer->length;
- callback_ctx->unread_len = binary_data_buffer->length;
+ callback_ctx->content_length = response->length;
+ callback_ctx->unread_len = response->length;
callback_ctx->chunked_stream = NULL;
}
@@ -203,8 +246,8 @@
}
else
{
- soap_body_str = binary_data_buffer->data;
- soap_body_len = axutil_strlen(binary_data_buffer->data);
+ soap_body_str = response->data;
+ soap_body_len = axutil_strlen(response->data);
}
soap_body_len = axutil_strlen(soap_body_str);
@@ -403,7 +446,7 @@
axutil_property_t* property = NULL;
axis2_char_t* queue_name = NULL;
axis2_char_t* value = NULL;
-
+
/* Get property */
property = axis2_conf_ctx_get_property(conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME);
@@ -429,7 +472,7 @@
AXIS2_MALLOC(env->allocator, axutil_strlen(value) + 1);
strcpy(queue_name, value);
- axutil_property_set_value(property, env, NULL);
+ /*axutil_property_set_value(property, env, NULL);*/
}
else
{
@@ -451,15 +494,21 @@
const axutil_env_t* env)
{
axutil_property_t* property = NULL;
- axis2_char_t* broker_ip = NULL;
+ void* value = NULL;
+ axis2_char_t* broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
property = axis2_conf_ctx_get_property(conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP);
- if (!property)
- return NULL;
+ if (property)
+ {
+ value = axutil_property_get_value(property, env);
- broker_ip = (axis2_char_t*)axutil_property_get_value(property, env);
+ if (value)
+ {
+ broker_ip = (axis2_char_t*)value;
+ }
+ }
return broker_ip;
}
@@ -472,7 +521,7 @@
{
axutil_property_t* property = NULL;
void* value = NULL;
- int broker_port = 0;
+ int broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
property = axis2_conf_ctx_get_property(conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT);
@@ -505,7 +554,7 @@
if (property)
{
value = (axis2_char_t*)axutil_property_get_value(property, env);
-
+
if (value && (axutil_strcmp(AXIS2_VALUE_TRUE, value) == 0))
{
use_separate_listener = AXIS2_TRUE;
@@ -528,83 +577,85 @@
* 3. TempQueue... */
axis2_endpoint_ref_t* endpoint_ref = NULL;
- const axis2_char_t* endpoint_address_original = NULL;
- axis2_char_t* endpoint_address = NULL;
axis2_amqp_destination_info_t* destination_info = NULL;
- char* substr = NULL;
- char* token = NULL;
-
- endpoint_ref = axis2_msg_ctx_get_to(msg_ctx, env);
- if (!endpoint_ref)
- return NULL;
-
- endpoint_address_original = axis2_endpoint_ref_get_address(endpoint_ref, env);
- if (!endpoint_address_original)
- return NULL;
-
- endpoint_address = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- (sizeof(axis2_char_t) * axutil_strlen(endpoint_address_original)) + 1);
- strcpy((char*)endpoint_address, (char*)endpoint_address_original);
destination_info = (axis2_amqp_destination_info_t*)
AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_destination_info_t));
destination_info->broker_ip = NULL;
- destination_info->broker_port = AXIS2_QPID_NULL_BROKER_PORT;
+ destination_info->broker_port = AXIS2_QPID_NULL_CONF_INT;
destination_info->queue_name = NULL;
-
- if ((substr = strstr(endpoint_address, AXIS2_AMQP_EPR_PREFIX))) /* Start with amqp: */
+
+ endpoint_ref = axis2_msg_ctx_get_to(msg_ctx, env);
+
+ if (endpoint_ref)
{
- if (strstr(endpoint_address, AXIS2_AMQP_EPR_ANON_SERVICE_NAME))
- {
- /* Server reply to dual-channel client */
- axutil_property_t* property = NULL;
- property = axis2_msg_ctx_get_property(msg_ctx, env,
- AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+ const axis2_char_t* endpoint_address_original = NULL;
+ axis2_char_t* endpoint_address = NULL;
+ char* substr = NULL;
+ char* token = NULL;
+ endpoint_address_original = axis2_endpoint_ref_get_address(endpoint_ref, env);
+
+ if (!endpoint_address_original)
+ return NULL;
- if (property)
- {
- axis2_char_t* queue_name = (axis2_char_t*)
- axutil_property_get_value(property, env);
+ endpoint_address = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ (sizeof(axis2_char_t) * axutil_strlen(endpoint_address_original)) + 1);
+ strcpy((char*)endpoint_address, (char*)endpoint_address_original);
+
+ if ((substr = strstr(endpoint_address, AXIS2_AMQP_EPR_PREFIX))) /* Start with amqp: */
+ {
+ if (strstr(endpoint_address, AXIS2_AMQP_EPR_ANON_SERVICE_NAME))
+ {
+ /* Server reply to dual-channel client */
+ axutil_property_t* property = NULL;
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
- if (queue_name)
+ if (property)
{
- destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
- env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
- strcpy(destination_info->queue_name, queue_name);
+ axis2_char_t* queue_name = (axis2_char_t*)
+ axutil_property_get_value(property, env);
+
+ if (queue_name)
+ {
+ destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+ env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+ strcpy(destination_info->queue_name, queue_name);
+ }
}
- }
- }
- else
- {
- substr+= strlen(AXIS2_AMQP_EPR_PREFIX) + 2; /* 2 -> "//" */
- if (substr) /* IP:PORT/services/SERVICE_NAME */
+ }
+ else
{
- token = strtok(substr, ":");
- if (token) /* IP */
+ substr+= strlen(AXIS2_AMQP_EPR_PREFIX) + 2; /* 2 -> "//" */
+ if (substr) /* IP:PORT/services/SERVICE_NAME */
{
- axis2_char_t* broker_ip = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- (sizeof(axis2_char_t) * strlen(token)) + 1);
- strcpy(broker_ip, token);
- destination_info->broker_ip = broker_ip;
-
- token = strtok(NULL, "/"); /* PORT */
- if (token)
+ token = strtok(substr, ":");
+ if (token) /* IP */
{
- destination_info->broker_port = atoi(token);
+ axis2_char_t* broker_ip = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ (sizeof(axis2_char_t) * strlen(token)) + 1);
+ strcpy(broker_ip, token);
+ destination_info->broker_ip = broker_ip;
- token = strtok(NULL, "#"); /* ... services/SERVICE_NAME */
+ token = strtok(NULL, "/"); /* PORT */
if (token)
{
- if ((substr = strstr(token, AXIS2_AMQP_EPR_SERVICE_PREFIX)))
+ destination_info->broker_port = atoi(token);
+
+ token = strtok(NULL, "#"); /* ... services/SERVICE_NAME */
+ if (token)
{
- substr+= strlen(AXIS2_AMQP_EPR_SERVICE_PREFIX) + 1; /* 1 -> "/" */
- if (substr)
+ if ((substr = strstr(token, AXIS2_AMQP_EPR_SERVICE_PREFIX)))
{
- axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- (sizeof(axis2_char_t) * strlen(substr)) + 1);
- strcpy(queue_name, substr);
- destination_info->queue_name = queue_name;
+ substr+= strlen(AXIS2_AMQP_EPR_SERVICE_PREFIX) + 1; /* 1 -> "/" */
+ if (substr)
+ {
+ axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ (sizeof(axis2_char_t) * strlen(substr)) + 1);
+ strcpy(queue_name, substr);
+ destination_info->queue_name = queue_name;
+ }
}
}
}
@@ -612,10 +663,36 @@
}
}
}
+ else if (0 == strcmp(endpoint_address, AXIS2_WSA_ANONYMOUS_URL)) /* Required to work with Sandesha2 */
+ {
+ axutil_property_t* property = NULL;
+ property = axis2_msg_ctx_get_property(msg_ctx, env,
+ AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+
+ if (property)
+ {
+ axis2_char_t* queue_name = (axis2_char_t*)
+ axutil_property_get_value(property, env);
+
+ if (queue_name)
+ {
+ destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+ env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+ strcpy(destination_info->queue_name, queue_name);
+ }
+ }
+ }
+ else if ((substr = strstr(endpoint_address, "jms:/")) &&
+ (substr == endpoint_address))
+ {
+
+ }
+
+ AXIS2_FREE(env->allocator, endpoint_address);
}
- else if (0 == strcmp(endpoint_address, AXIS2_WSA_ANONYMOUS_URL)) /* Required to work with Sandesha2 */
+ else
{
- /* Server reply to dual-channel client */
+ /* Single-channel blocking */
axutil_property_t* property = NULL;
property = axis2_msg_ctx_get_property(msg_ctx, env,
AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
@@ -624,7 +701,7 @@
{
axis2_char_t* queue_name = (axis2_char_t*)
axutil_property_get_value(property, env);
-
+
if (queue_name)
{
destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
@@ -633,25 +710,6 @@
}
}
}
- else if ((substr = strstr(endpoint_address, "jms:/")) &&
- (substr == endpoint_address))
- {
-
- }
- else if ((substr = strstr(endpoint_address, "TempQueue")) &&
- (substr == endpoint_address))
- {
- axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- (sizeof(axis2_char_t) * strlen(endpoint_address)) + 1);
- strcpy(queue_name, endpoint_address);
- destination_info->queue_name = queue_name;
- }
- else
- {
- AXIS2_FREE(env->allocator, destination_info);
- AXIS2_FREE(env->allocator, endpoint_address);
- return NULL;
- }
/* Get broker IP/Port from conf_ctx if they are not
* found in the destination URI */
@@ -682,7 +740,7 @@
}
}
- if (AXIS2_QPID_NULL_BROKER_PORT == destination_info->broker_port)
+ if (AXIS2_QPID_NULL_CONF_INT == destination_info->broker_port)
{
axis2_conf_ctx_t* conf_ctx = NULL;
@@ -705,7 +763,103 @@
}
}
- AXIS2_FREE(env->allocator, endpoint_address);
-
return destination_info;
}
+
+
+AXIS2_EXTERN int AXIS2_CALL
+axis2_amqp_util_msg_ctx_get_request_timeout(
+ axis2_msg_ctx_t* msg_ctx,
+ const axutil_env_t* env)
+{
+ axis2_conf_ctx_t* conf_ctx = NULL;
+ axutil_property_t* property = NULL;
+ void* value = NULL;
+ int request_timeout = AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT;
+
+ conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+
+ if (conf_ctx)
+ {
+ property = axis2_conf_ctx_get_property(conf_ctx, env,
+ AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT);
+
+ if (property)
+ {
+ value = axutil_property_get_value(property, env);
+
+ if (value)
+ {
+ request_timeout = *(int*)value;
+ }
+ }
+ }
+
+ return request_timeout;
+}
+
+
+AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+axis2_amqp_util_msg_ctx_get_server_side(
+ axis2_msg_ctx_t* msg_ctx,
+ const axutil_env_t* env)
+{
+ axis2_conf_ctx_t* conf_ctx = NULL;
+ axis2_bool_t is_server = AXIS2_FALSE;
+
+ conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+
+ if (conf_ctx)
+ {
+ is_server =
+ axis2_amqp_util_conf_ctx_get_server_side(
+ conf_ctx, env);
+ }
+
+ return is_server;
+}
+
+
+AXIS2_EXTERN void AXIS2_CALL
+axis2_amqp_response_free(
+ axis2_amqp_response_t* response,
+ const axutil_env_t* env)
+{
+ if (response)
+ {
+ if (response->data)
+ {
+ AXIS2_FREE(env->allocator, response->data);
+ }
+
+ if (response->content_type)
+ {
+ AXIS2_FREE(env->allocator, response->content_type);
+ }
+
+ AXIS2_FREE(env->allocator, response);
+ }
+}
+
+
+AXIS2_EXTERN void AXIS2_CALL
+axis2_amqp_destination_info_free(
+ axis2_amqp_destination_info_t* destination_info,
+ const axutil_env_t* env)
+{
+ if (destination_info)
+ {
+ if (destination_info->broker_ip)
+ {
+ AXIS2_FREE(env->allocator, destination_info->broker_ip);
+ }
+
+ if (destination_info->queue_name)
+ {
+ AXIS2_FREE(env->allocator, destination_info->queue_name);
+ }
+
+ AXIS2_FREE(env->allocator, destination_info);
+ }
+}
+
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h Wed Nov 26 22:24:07 2008
@@ -30,12 +30,12 @@
{
#endif
- typedef struct axis2_amqp_binary_data_buffer
+ typedef struct axis2_amqp_response
{
void* data;
int length;
axis2_char_t* content_type;
- } axis2_amqp_binary_data_buffer_t;
+ } axis2_amqp_response_t;
typedef struct axis2_amqp_destination_info
{
@@ -45,20 +45,32 @@
} axis2_amqp_destination_info_t;
AXIS2_EXTERN axis2_char_t* AXIS2_CALL
- axis2_amqp_util_get_conf_value_string(
+ axis2_amqp_util_get_in_desc_conf_value_string(
axis2_transport_in_desc_t* in_desc,
const axutil_env_t* env,
const axis2_char_t* param_name);
AXIS2_EXTERN int AXIS2_CALL
- axis2_amqp_util_get_conf_value_int(
+ axis2_amqp_util_get_in_desc_conf_value_int(
axis2_transport_in_desc_t* in_desc,
const axutil_env_t* env,
const axis2_char_t* param_name);
+ AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+ axis2_amqp_util_get_out_desc_conf_value_string(
+ axis2_transport_out_desc_t* out_desc,
+ const axutil_env_t* env,
+ const axis2_char_t* param_name);
+
+ AXIS2_EXTERN int AXIS2_CALL
+ axis2_amqp_util_get_out_desc_conf_value_int(
+ axis2_transport_out_desc_t* out_desc,
+ const axutil_env_t* env,
+ const axis2_char_t* param_name);
+
AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
axis2_amqp_util_get_soap_envelope(
- axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+ axis2_amqp_response_t* response,
const axutil_env_t* env,
axis2_msg_ctx_t* msg_ctx);
@@ -104,6 +116,26 @@
axis2_msg_ctx_t* msg_ctx,
const axutil_env_t* env);
+ AXIS2_EXTERN int AXIS2_CALL
+ axis2_amqp_util_msg_ctx_get_request_timeout(
+ axis2_msg_ctx_t* msg_ctx,
+ const axutil_env_t* env);
+
+ AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+ axis2_amqp_util_msg_ctx_get_server_side(
+ axis2_msg_ctx_t* msg_ctx,
+ const axutil_env_t* env);
+
+ AXIS2_EXTERN void AXIS2_CALL
+ axis2_amqp_response_free(
+ axis2_amqp_response_t* response,
+ const axutil_env_t* env);
+
+ AXIS2_EXTERN void AXIS2_CALL
+ axis2_amqp_destination_info_free(
+ axis2_amqp_destination_info_t* destination_info,
+ const axutil_env_t* env);
+
#ifdef __cplusplus
}
#endif