You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by sh...@apache.org on 2008/11/27 07:24:07 UTC

svn commit: r721090 - in /webservices/axis2/trunk/c/src/core/transport/amqp: receiver/ receiver/qpid_receiver/request_processor/ sender/ sender/qpid_sender/ server/axis2_amqp_server/ util/

Author: shankar
Date: Wed Nov 26 22:24:07 2008
New Revision: 721090

URL: http://svn.apache.org/viewvc?rev=721090&view=rev
Log:
applying patch given in AXIS2C-1298

Modified:
    webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c
    webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c
    webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c
    webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp
    webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h
    webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp
    webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h
    webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c
    webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h
    webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c
    webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/axis2_amqp_receiver.c Wed Nov 26 22:24:07 2008
@@ -70,7 +70,7 @@
 		axutil_property_t* property = NULL;
 		const axis2_char_t* broker_ip = NULL;
 		int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
-		*broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
+		*broker_port = AXIS2_QPID_NULL_CONF_INT;
 
         receiver_resource_pack->conf_ctx_private = axis2_build_conf_ctx(env, repo);
         if (!receiver_resource_pack->conf_ctx_private)
@@ -86,7 +86,7 @@
 				AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
 
 		/* Set broker port */
-    	*broker_port = (qpid_broker_port != AXIS2_QPID_NULL_BROKER_PORT) ? 
+    	*broker_port = (qpid_broker_port != AXIS2_QPID_NULL_CONF_INT) ? 
 			qpid_broker_port : AXIS2_QPID_DEFAULT_BROKER_PORT;
 		property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
 		axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
@@ -110,7 +110,7 @@
 	axutil_property_t* property = NULL;
 	const axis2_char_t* broker_ip = NULL;
 	int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
-	*broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
+	*broker_port = AXIS2_QPID_NULL_CONF_INT;
 
 	AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
 
@@ -118,22 +118,26 @@
 	receiver_resource_pack->conf_ctx = conf_ctx;
 	
 	/* Set broker IP */
-	broker_ip = axis2_amqp_util_get_conf_value_string(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_IP); 
+	broker_ip = axis2_amqp_util_get_in_desc_conf_value_string(
+			in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_IP); 
 	if (!broker_ip)
 	{
 		broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
 	}
-	property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip);
+	property = axutil_property_create_with_args(
+			env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip);
 	axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env, 
 			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
 
 	/* Set broker port */
-	*broker_port = axis2_amqp_util_get_conf_value_int(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_PORT);
-    if (!broker_port)
+	*broker_port = axis2_amqp_util_get_in_desc_conf_value_int(
+			in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_PORT);
+    if (*broker_port == AXIS2_QPID_NULL_CONF_INT)
 	{
 		*broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
 	}
-	property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
+	property = axutil_property_create_with_args(
+			env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port);
 	axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
 			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);
 	
@@ -250,7 +254,7 @@
 {
 	int status = AXIS2_SUCCESS;
 
-	*inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_BROKER_PORT);
+	*inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_CONF_INT);
 	if (!(*inst))
 	{
 		status = AXIS2_FAILURE;

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c Wed Nov 26 22:24:07 2008
@@ -99,7 +99,6 @@
 	axis2_status_t status = AXIS2_FAILURE;
 	axutil_hash_t *binary_data_map = NULL;
 	axiom_soap_body_t *soap_body = NULL;
-	axis2_endpoint_ref_t* reply_to = NULL;
 	axutil_property_t* reply_to_property = NULL;
 
 	/* Create msg_ctx */
@@ -307,13 +306,10 @@
 	/* SOAP version */
 	axis2_msg_ctx_set_is_soap_11(msg_ctx, env, is_soap_11);
 
-	/* Set ReplyTo in the msg_ctx */
-	reply_to = axis2_endpoint_ref_create(env, request_resource_pack->reply_to);
-	axis2_msg_ctx_set_reply_to(msg_ctx, env, reply_to);
-
-	/* Set ReplyTo in the msg_ctx as a property. This is useful when 
-	 * server replies in the dual-channel case */
-	reply_to_property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST, 0, 0, 
+	/* Set ReplyTo in the msg_ctx as a property. This is used by the server when
+	 * 1. WS-A is not in use 
+	 * 2. ReplyTo is an anonymous EPR - Sandesha2/Dual-channel */
+	reply_to_property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST, 0, 0,
 			(void*)request_resource_pack->reply_to);
 	axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO, 
 			reply_to_property);

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c Wed Nov 26 22:24:07 2008
@@ -60,10 +60,27 @@
 	axis2_transport_out_desc_t* out_desc)
 {
 	axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
-    sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+    axutil_property_t* property = NULL;
+	int* request_timeout = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
+	*request_timeout = AXIS2_QPID_NULL_CONF_INT;
 
+	AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+	sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
 	sender_resource_pack->conf_ctx = conf_ctx;
 
+	/* Set request timeout */
+    *request_timeout = axis2_amqp_util_get_out_desc_conf_value_int(
+            out_desc, env, AXIS2_AMQP_CONF_QPID_REQUEST_TIMEOUT);
+    if (*request_timeout == AXIS2_QPID_NULL_CONF_INT)
+    {
+        *request_timeout = AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT;
+    }
+    property = axutil_property_create_with_args(
+            env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)request_timeout);
+    axis2_conf_ctx_set_property(sender_resource_pack->conf_ctx, env,
+            AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT, property);
+
 	return AXIS2_SUCCESS;
 }
 
@@ -81,7 +98,6 @@
 	axiom_xml_writer_t* xml_writer = NULL;
 	axiom_output_t* request_om_output = NULL;
 	axis2_char_t* request_content = NULL;
-	axiom_soap_envelope_t* response_soap_envelope = NULL;
 	axis2_bool_t is_server = AXIS2_TRUE;
 	axis2_bool_t is_soap_11 = AXIS2_FALSE;
 	axutil_string_t* content_type = NULL;
@@ -171,71 +187,92 @@
 		return AXIS2_FAILURE;
 	}
 	
-	is_server = axis2_msg_ctx_get_server_side(msg_ctx, env);
+	is_server = axis2_amqp_util_msg_ctx_get_server_side(msg_ctx, env);
 
 	if (is_server)
 	{
-		status = axis2_qpid_server_send(request_content, env, 
+		status = axis2_qpid_send(request_content, env, 
 				axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
 	}
 	else
 	{
-		axis2_op_t* op = NULL;
-		const axis2_char_t* mep = NULL;
-		
-		op = axis2_msg_ctx_get_op(msg_ctx, env);
-		mep = axis2_op_get_msg_exchange_pattern(op, env);
-
-		if (axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) == 0 || 
-			axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0) /* One-way */
+		if (AXIS2_TRUE == axis2_amqp_util_msg_ctx_get_use_separate_listener(
+					msg_ctx, env)) /* Dual Channel */
 		{
-			status = axis2_qpid_client_send_robust(request_content, env, 
+			status = axis2_qpid_send(request_content, env, 
 					axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
-
-			/* Set Status Code in msg_ctx */
-			axis2_msg_ctx_set_status_code(msg_ctx, env, status);
 		}
 		else
 		{
-			if (AXIS2_TRUE == axis2_amqp_util_msg_ctx_get_use_separate_listener(msg_ctx, env)) /* Dual Channel */
+			axis2_op_t* op = NULL;
+			const axis2_char_t* mep = NULL;
+			
+			op = axis2_msg_ctx_get_op(msg_ctx, env);
+			
+			if (op)
+			{
+				mep = axis2_op_get_msg_exchange_pattern(op, env);
+			}
+
+			axis2_amqp_response_t* response = NULL;
+			response = axis2_qpid_send_receive(request_content, env, 
+					axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+		
+			if (response)
 			{
-				axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
-				sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+				/* Create in stream */
+				if (response->data)
+				{
+					axutil_stream_t* in_stream = NULL;
+					axutil_property_t* property = NULL;
+				
+					in_stream = axutil_stream_create_basic(env);
+					axutil_stream_write(in_stream, env, response->data, 
+							response->length);
+			
+					property = axutil_property_create(env);
+					axutil_property_set_scope(property, env, AXIS2_SCOPE_REQUEST);
+					axutil_property_set_free_func(property, env, axutil_stream_free_void_arg);
+					axutil_property_set_value(property, env, in_stream);
 				
-				if (!sender_resource_pack->conf_ctx)
-					return AXIS2_FAILURE;
+					axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, property);
+				}
+			
+				if (mep)
+				{
+					if (0 == axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN)) /* Out-In */
+					{
+						axiom_soap_envelope_t* response_soap_envelope = NULL;
 					
-				axis2_char_t* reply_to_queue_name = NULL;
-				reply_to_queue_name = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(
-						sender_resource_pack->conf_ctx, env);
-
-				if (!reply_to_queue_name)
-					return AXIS2_FAILURE;
+						response_soap_envelope = axis2_amqp_util_get_soap_envelope(response, env, msg_ctx);
+						if (response_soap_envelope)
+						{
+							axis2_msg_ctx_set_response_soap_envelope(msg_ctx, env, response_soap_envelope);
+						}
+					}
+				}
+			
+				status = AXIS2_SUCCESS;
 				
-				status = axis2_qpid_client_send_dual(request_content, env, reply_to_queue_name, 
-						axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+				axis2_msg_ctx_set_status_code(msg_ctx, env, status);
+
+				axis2_amqp_response_free(response, env);
 			}
-			else /* Single Channel */
+			else
 			{
-				axis2_amqp_binary_data_buffer_t* binary_data_buffer = NULL;
-				binary_data_buffer = axis2_qpid_client_send_receive(request_content, env, 
-						axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
-
-				if (binary_data_buffer)
+				if (mep)
 				{
-					response_soap_envelope = axis2_amqp_util_get_soap_envelope(binary_data_buffer, env, msg_ctx);
-				
-					if (response_soap_envelope)
-						axis2_msg_ctx_set_response_soap_envelope(msg_ctx, env, response_soap_envelope);
-
-					AXIS2_FREE(env->allocator, binary_data_buffer->data);
-					AXIS2_FREE(env->allocator, binary_data_buffer->content_type);
-					AXIS2_FREE(env->allocator, binary_data_buffer);
-
-					status = AXIS2_SUCCESS;
+					if (axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) == 0 || 
+						axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0) /* One-way */
+					{
+						status = AXIS2_SUCCESS;
+						
+						/* Set status code in msg_ctx */
+						axis2_msg_ctx_set_status_code(msg_ctx, env, status);
+					}
 				}
 			}
-        }
+		}
 	}
 
 	if (content_type)

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp Wed Nov 26 22:24:07 2008
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/sys/Time.h>
 #include <axis2_amqp_defines.h>
 #include <axiom_mime_part.h>
-#include <fstream>
 #include <axis2_qpid_sender.h>
+#include <fstream>
 
 using namespace std;
+using namespace qpid::client;
+using namespace qpid::framing;
 
 Axis2QpidSender::Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env)
 {
@@ -29,19 +36,15 @@
 	this->env = env;
 	this->responseContent = "";
 	this->responseContentType = "";
-	this->subscriptionManager = NULL;
 }
 
 
 Axis2QpidSender::~Axis2QpidSender(void)
-{
-	if (subscriptionManager)
-		delete subscriptionManager;
-}
+{}
 
 
-bool Axis2QpidSender::ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11, 
-		string contentType, string soapAction, axutil_array_list_t* mime_parts)
+bool Axis2QpidSender::SendReceive(string messageContent, string toQueueName, bool isSOAP11, 
+		string contentType, string soapAction, axutil_array_list_t* mime_parts, int timeout)
 {
 	bool status = false;
 	this->responseContent = "";
@@ -64,13 +67,13 @@
 							 arg::bindingKey = replyToQueueName);
 
 		/* Create Message */
-		Message message;
+		Message reqMessage;
 		
-		message.getDeliveryProperties().setRoutingKey(toQueueName);
-		message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
+		reqMessage.getDeliveryProperties().setRoutingKey(toQueueName);
+		reqMessage.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
 
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+		reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+		reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
 
 		if (mime_parts)
 		{
@@ -82,77 +85,25 @@
 			messageContent.append(mimeBody);
 		}
 
-		message.setData(messageContent);
+		reqMessage.setData(messageContent);
 	
-		async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-		
-		/* Create Subscription manager */
-		subscriptionManager = new SubscriptionManager(session);
-		
-		subscriptionManager->subscribe(*this, replyToQueueName);
-		subscriptionManager->run(); 
-		
-		/* Current thread gets bloked here until the response hits the message listener */
-
-		connection.close();
-		
-		status = true;
-	}
-	catch (const std::exception& e)
-	{
-	}
-
-	return status;
-}
-
-
-bool Axis2QpidSender::ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11, 
-		string contentType, string soapAction, axutil_array_list_t* mime_parts)
-{
-	bool status = false;
-
-	try
-	{
-		Connection connection;
-		connection.open(qpidBrokerIP, qpidBrokerPort);
+		async(session).messageTransfer(arg::content = reqMessage, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
 		
-		Session session = connection.newSession();
-
-		/* Declare Private Queue */
-		string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX;
-		replyToQueueName.append(axutil_uuid_gen(env));
-
-		session.queueDeclare(arg::queue = replyToQueueName);
-		session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, 
-							 arg::queue = replyToQueueName,
-							 arg::bindingKey = replyToQueueName);
-
-		/* Create Message */ 
-		Message message;
-
-		message.getDeliveryProperties().setRoutingKey(toQueueName);
-		message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
+		/* Create subscription manager */
+		SubscriptionManager subscriptionManager(session);
 		
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+		Message resMessage;
+		qpid::sys::Duration reqTimeout(timeout * AXIS2_AMQP_NANOSEC_PER_MILLISEC);
 
-		if (mime_parts)
+		if (subscriptionManager.get(resMessage, replyToQueueName, reqTimeout))
 		{
-			string mimeBody;
-			GetMimeBody(mime_parts, mimeBody);
+			responseContent = resMessage.getData();
+			responseContentType = resMessage.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
 		
-			messageContent.clear();/* MIME parts include SOAP envelop */
-
-			messageContent.append(mimeBody);
+			status = true;
 		}
 
-		message.setData(messageContent);
-
-		async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-
 		connection.close();
-
-		status = true;
 	}
 	catch (const std::exception& e)
 	{
@@ -162,7 +113,7 @@
 }
 
 
-bool Axis2QpidSender::ClientSendDual(string messageContent, string toQueueName, string replyToQueueName, 
+bool Axis2QpidSender::Send(string messageContent, string toQueueName, string replyToQueueName, 
 		bool isSOAP11, string contentType, string soapAction, axutil_array_list_t* mime_parts)
 {
 	bool status = false;
@@ -174,62 +125,20 @@
 		
 		Session session = connection.newSession();
 
-		session.queueDeclare(arg::queue = replyToQueueName);
-		session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
-							 arg::queue = replyToQueueName,
-							 arg::bindingKey = replyToQueueName);
-
 		/* Create Message */ 
 		Message message;
 
 		message.getDeliveryProperties().setRoutingKey(toQueueName);
-		message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
-		
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
-		message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
 
-		if (mime_parts)
+		if (!replyToQueueName.empty()) /* Client dual-channel */
 		{
-			string mimeBody;
-			GetMimeBody(mime_parts, mimeBody);
-		
-			messageContent.clear();/* MIME parts include SOAP envelop */
+			message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
 
-			messageContent.append(mimeBody);
+			session.queueDeclare(arg::queue = replyToQueueName);
+			session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
+								 arg::queue = replyToQueueName,
+							 	 arg::bindingKey = replyToQueueName);
 		}
-
-		message.setData(messageContent);
-
-		async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
-
-		connection.close();
-
-		status = true;
-	}
-	catch (const std::exception& e)
-	{
-	}
-
-	return status;
-}
-
-
-bool Axis2QpidSender::ServerSend(string messageContent, string toQueueName, bool isSOAP11, 
-		string contentType, string soapAction, axutil_array_list_t* mime_parts)
-{
-	bool status = false;
-
-	try
-	{
-		Connection connection;
-		connection.open(qpidBrokerIP, qpidBrokerPort);
-		
-		Session session = connection.newSession();
-
-		/* Create Message */ 
-		Message message;
-
-		message.getDeliveryProperties().setRoutingKey(toQueueName);
 		
 		message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
 		message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
@@ -259,15 +168,6 @@
 	return status;
 }
 
-void Axis2QpidSender::received(Message& message)
-{
-	responseContent = message.getData();
-	responseContentType = message.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
-
-	if (subscriptionManager)
-		subscriptionManager->cancel(message.getDestination());
-}
-
 
 void Axis2QpidSender::GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody)
 {

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h Wed Nov 26 22:24:07 2008
@@ -18,51 +18,32 @@
 #ifndef AXIS2_QPID_SENDER_H
 #define AXIS2_QPID_SENDER_H
 
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/Message.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
 #include <axis2_util.h>
-
 #include <sstream>
 #include <string>
 
-using namespace qpid::client;
-using namespace qpid::framing;
-using std::stringstream;
 using std::string;
 
-class Axis2QpidSender : public MessageListener
+class Axis2QpidSender
 {
 	public:
 		Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env);
 		~Axis2QpidSender(void);
 
-		/* Client */
-		bool ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11, 
-				string contentType, string soapAction, axutil_array_list_t* mime_parts);
-		bool ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11, 
-				string contentType, string soapAction, axutil_array_list_t* mime_parts);
-		bool ClientSendDual(string messageContent, string toQueueName, string replyToQueueName, bool isSOAP11, 
+		bool SendReceive(string messageContent, string toQueueName, bool isSOAP11, 
+				string contentType, string soapAction, axutil_array_list_t* mime_parts, int timeout);
+		bool Send(string messageContent, string toQueueName, string replyToQueueName, bool isSOAP11, 
 				string contentType, string soapAction, axutil_array_list_t* mime_parts);
 		
-		/* Server */
-		bool ServerSend(string messageContent, string toQueueName, bool isSOAP11, 
-				string contentType, string soapAction, axutil_array_list_t* mime_parts);
-
 		string 		 responseContent;
 		string		 responseContentType;
 
 	private:
-		virtual void received(Message& message);
-
 		void GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody);
 
 		string		  		 qpidBrokerIP;
 		int			  		 qpidBrokerPort;
 		const axutil_env_t*  env;
-		SubscriptionManager* subscriptionManager;
 };
 
 #endif

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp Wed Nov 26 22:24:07 2008
@@ -24,8 +24,8 @@
 {
 #endif
 
-AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
-axis2_qpid_client_send_receive(
+AXIS2_EXTERN axis2_amqp_response_t* AXIS2_CALL
+axis2_qpid_send_receive(
 	const axis2_char_t* request_content,
 	const axutil_env_t* env,
 	const axis2_char_t* content_type,
@@ -33,7 +33,6 @@
 	axis2_msg_ctx_t* msg_ctx)
 {
 	axis2_amqp_destination_info_t* destination_info = NULL;
-
 	destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
 
 	if (!destination_info || !destination_info->broker_ip || 
@@ -44,45 +43,45 @@
 
 	axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
 	axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
+	int timeout = axis2_amqp_util_msg_ctx_get_request_timeout(msg_ctx, env);
 
 	/* Get Response */
 	Axis2QpidSender qpid_sender(destination_info->broker_ip, 
 			destination_info->broker_port, env);
-	bool status = qpid_sender.ClientSendReceive(request_content, destination_info->queue_name, 
-			is_soap_11, content_type, soap_action, mime_parts);
+	
+	bool status = qpid_sender.SendReceive(request_content, destination_info->queue_name, 
+			is_soap_11, content_type, soap_action, mime_parts, timeout);
 
-	AXIS2_FREE(env->allocator, destination_info->broker_ip);
-	AXIS2_FREE(env->allocator, destination_info->queue_name);
-	AXIS2_FREE(env->allocator, destination_info);
+	axis2_amqp_destination_info_free(destination_info, env);
 
 	if (!status)
 	{
 		return NULL;
 	}
 
-	/* Create a Copy and Return */
-	axis2_amqp_binary_data_buffer_t* binary_data_buffer = (axis2_amqp_binary_data_buffer_t*)
-		AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_binary_data_buffer_t));
+	/* Create response */
+	axis2_amqp_response_t* response = (axis2_amqp_response_t*)AXIS2_MALLOC(
+			env->allocator, sizeof(axis2_amqp_response_t));
 
 	/* Data */
-	binary_data_buffer->data = AXIS2_MALLOC(env->allocator, qpid_sender.responseContent.size());
-	memcpy(binary_data_buffer->data, qpid_sender.responseContent.c_str(), 
+	response->data = AXIS2_MALLOC(env->allocator, qpid_sender.responseContent.size());
+	memcpy(response->data, qpid_sender.responseContent.c_str(), 
 			qpid_sender.responseContent.size());
 
 	/* Length */
-	binary_data_buffer->length = qpid_sender.responseContent.size();
+	response->length = qpid_sender.responseContent.size();
 
 	/* ContentType */
-	binary_data_buffer->content_type = (axis2_char_t*)
-		AXIS2_MALLOC(env->allocator, qpid_sender.responseContentType.size() + 1);
-	strcpy(binary_data_buffer->content_type, qpid_sender.responseContentType.c_str());
+	response->content_type = (axis2_char_t*)AXIS2_MALLOC(
+			env->allocator, qpid_sender.responseContentType.size() + 1);
+	strcpy(response->content_type, qpid_sender.responseContentType.c_str());
 
-	return binary_data_buffer;
+	return response;
 }
 
 
 AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_robust(
+axis2_qpid_send(
 	const axis2_char_t* request_content,
 	const axutil_env_t* env,
 	const axis2_char_t* content_type,
@@ -91,6 +90,7 @@
 {
 	axis2_amqp_destination_info_t* destination_info = NULL;
 	axis2_status_t status =  AXIS2_FAILURE;
+	string reply_to_queue_name = "";
 
 	destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
 
@@ -102,89 +102,25 @@
 
 	axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
 	axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-	
-	Axis2QpidSender qpid_sender(destination_info->broker_ip, 
-			destination_info->broker_port, env);
-
-	status = qpid_sender.ClientSendRobust(request_content, destination_info->queue_name, 
-			is_soap_11, content_type, soap_action, mime_parts);
 
-	AXIS2_FREE(env->allocator, destination_info->broker_ip);
-	AXIS2_FREE(env->allocator, destination_info->queue_name);
-	AXIS2_FREE(env->allocator, destination_info);
-
-	return status;
-}
- 
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_dual(
-	const axis2_char_t* request_content,
-	const axutil_env_t* env,
-	const axis2_char_t* reply_to_queue_name,
-	const axis2_char_t* content_type,
-	const axis2_char_t* soap_action,
-	axis2_msg_ctx_t* msg_ctx)
-{
-	axis2_amqp_destination_info_t* destination_info = NULL;
-	axis2_status_t status =  AXIS2_FAILURE;
-
-	destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
-
-	if (!destination_info || !destination_info->broker_ip || 
-		!destination_info->broker_port || !destination_info->queue_name)
+	/* If client side, find reply_to_queue_name */
+	if (!axis2_msg_ctx_get_server_side(msg_ctx, env))
 	{
-		return AXIS2_FAILURE;
-	}
-
-	axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
-	axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-	
-	Axis2QpidSender qpid_sender(destination_info->broker_ip, 
-			destination_info->broker_port, env);
-
-	status = qpid_sender.ClientSendDual(request_content, destination_info->queue_name, 
-			reply_to_queue_name, is_soap_11, content_type, soap_action, mime_parts);
-
-	AXIS2_FREE(env->allocator, destination_info->broker_ip);
-	AXIS2_FREE(env->allocator, destination_info->queue_name);
-	AXIS2_FREE(env->allocator, destination_info);
+		axis2_conf_ctx_t* conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
 
-	return status;
-}
- 
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_server_send(
-	const axis2_char_t* request_content,
-	const axutil_env_t* env,
-	const axis2_char_t* content_type,
-	const axis2_char_t* soap_action,
-	axis2_msg_ctx_t* msg_ctx)
-{
-	axis2_amqp_destination_info_t* destination_info = NULL;
-	axis2_status_t status =  AXIS2_FAILURE;
-
-	destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
-
-	if (!destination_info || !destination_info->broker_ip || 
-		!destination_info->broker_port || !destination_info->queue_name)
-	{
-		return AXIS2_FAILURE;
+		axis2_char_t* queue_name = 
+			axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx, env);
+		if (queue_name)
+			reply_to_queue_name = queue_name;
 	}
 
-	axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
-	axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
-
 	Axis2QpidSender qpid_sender(destination_info->broker_ip, 
 			destination_info->broker_port, env);
 
-	status = qpid_sender.ServerSend(request_content, destination_info->queue_name, 
-			is_soap_11, content_type, soap_action, mime_parts);
+	status = qpid_sender.Send(request_content, destination_info->queue_name, 
+			reply_to_queue_name, is_soap_11, content_type, soap_action, mime_parts);
 
-	AXIS2_FREE(env->allocator, destination_info->broker_ip);
-	AXIS2_FREE(env->allocator, destination_info->queue_name);
-	AXIS2_FREE(env->allocator, destination_info);
+	axis2_amqp_destination_info_free(destination_info, env);
 
 	return status;
 }

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h Wed Nov 26 22:24:07 2008
@@ -27,8 +27,8 @@
 {
 #endif
 
-AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
-axis2_qpid_client_send_receive(
+AXIS2_EXTERN axis2_amqp_response_t* AXIS2_CALL
+axis2_qpid_send_receive(
 	const axis2_char_t* request_content,
 	const axutil_env_t* env,
 	const axis2_char_t* content_type,
@@ -36,24 +36,7 @@
 	axis2_msg_ctx_t* msg_ctx);
 
 AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_robust(
-	const axis2_char_t* request_content,
-	const axutil_env_t* env,
-	const axis2_char_t* content_type,
-	const axis2_char_t* soap_action,
-	axis2_msg_ctx_t* msg_ctx);
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_client_send_dual(
-	const axis2_char_t* request_content,
-	const axutil_env_t* env,
-	const axis2_char_t* reply_to_queue_name,
-	const axis2_char_t* content_type,
-	const axis2_char_t* soap_action,
-	axis2_msg_ctx_t* msg_ctx);
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_server_send(
+axis2_qpid_send(
 	const axis2_char_t* request_content,
 	const axutil_env_t* env,
 	const axis2_char_t* content_type,

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c Wed Nov 26 22:24:07 2008
@@ -133,7 +133,7 @@
 	extern int optopt;
 	int c;
 	const axis2_char_t* qpid_broker_ip = NULL;
-	int qpid_broker_port = AXIS2_QPID_NULL_BROKER_PORT;
+	int qpid_broker_port = AXIS2_QPID_NULL_CONF_INT;
 	const axis2_char_t* repo_path = AXIS2_AMQP_SERVER_REPO_PATH;
 	axutil_log_levels_t log_level = AXIS2_LOG_LEVEL_DEBUG;
 	const axis2_char_t* log_file_name = AXIS2_AMQP_SERVER_LOG_FILE_NAME;

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h Wed Nov 26 22:24:07 2008
@@ -24,13 +24,16 @@
 
 #define AXIS2_AMQP_CONF_QPID_BROKER_IP				 "qpid_broker_ip"
 #define AXIS2_AMQP_CONF_QPID_BROKER_PORT			 "qpid_broker_port"
+#define AXIS2_AMQP_CONF_QPID_REQUEST_TIMEOUT		 "request_timeout"
 
 #define AXIS2_QPID_DEFAULT_BROKER_IP    			 "127.0.0.1"
 #define AXIS2_QPID_DEFAULT_BROKER_PORT  			 5672
-#define AXIS2_QPID_NULL_BROKER_PORT  			 	 0
+#define AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT  		 500
+#define AXIS2_QPID_NULL_CONF_INT	  			 	 -1
 
 #define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP		 "qpid_broker_ip"
 #define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT	 "qpid_broker_port"
+#define AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT "request_timeout"
 #define AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME		 "queue_name"
 
 #define AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO		 "qpid_reply_to"
@@ -51,10 +54,12 @@
 #define AXIS2_AMQP_EPR_SERVICE_PREFIX				 "services"
 #define AXIS2_AMQP_EPR_ANON_SERVICE_NAME			 "__ANONYMOUS_SERVICE__"
 
-#define AXIS2_AMQP_EQ 				'='
-#define AXIS2_AMQP_SEMI_COLON 		';'
-#define AXIS2_AMQP_ESC_NULL 		'\0'
-#define AXIS2_AMQP_DOUBLE_QUOTE 	'"'
-#define AXIS2_AMQP_B_SLASH 			'\\'
+#define AXIS2_AMQP_EQ 								 '='
+#define AXIS2_AMQP_SEMI_COLON 						 ';'
+#define AXIS2_AMQP_ESC_NULL 						 '\0'
+#define AXIS2_AMQP_DOUBLE_QUOTE 					 '"'
+#define AXIS2_AMQP_B_SLASH 							 '\\'
+
+#define AXIS2_AMQP_NANOSEC_PER_MILLISEC				 1000*1000
 
 #endif

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c Wed Nov 26 22:24:07 2008
@@ -24,7 +24,7 @@
 #include <axis2_amqp_util.h>
 
 AXIS2_EXTERN axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_conf_value_string(
+axis2_amqp_util_get_in_desc_conf_value_string(
 	axis2_transport_in_desc_t* in_desc,
 	const axutil_env_t* env,
 	const axis2_char_t* param_name)
@@ -47,15 +47,15 @@
 
 
 AXIS2_EXTERN int AXIS2_CALL
-axis2_amqp_util_get_conf_value_int(
+axis2_amqp_util_get_in_desc_conf_value_int(
 	axis2_transport_in_desc_t* in_desc,
 	const axutil_env_t* env,
 	const axis2_char_t* param_name)
 {
 	axis2_char_t* value_str = NULL;
-	int value = 0;
+	int value = AXIS2_QPID_NULL_CONF_INT;
 
-	value_str = axis2_amqp_util_get_conf_value_string(
+	value_str = axis2_amqp_util_get_in_desc_conf_value_string(
 			in_desc, env, param_name);
 	if (value_str)
 	{
@@ -66,9 +66,52 @@
 }
 
 
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+axis2_amqp_util_get_out_desc_conf_value_string(
+	axis2_transport_out_desc_t* out_desc,
+	const axutil_env_t* env,
+	const axis2_char_t* param_name)
+{
+	axutil_param_t* param = NULL;
+    axis2_char_t* value = NULL;
+
+	param = (axutil_param_t*)
+			axutil_param_container_get_param(
+				axis2_transport_out_desc_param_container(out_desc, env),
+				env,
+				param_name);
+    if (param)
+    {
+		value = axutil_param_get_value(param, env);
+    }
+
+	return value;
+}
+
+
+AXIS2_EXTERN int AXIS2_CALL
+axis2_amqp_util_get_out_desc_conf_value_int(
+	axis2_transport_out_desc_t* out_desc,
+	const axutil_env_t* env,
+	const axis2_char_t* param_name)
+{
+	axis2_char_t* value_str = NULL;
+	int value = AXIS2_QPID_NULL_CONF_INT;
+
+	value_str = axis2_amqp_util_get_out_desc_conf_value_string(
+			out_desc, env, param_name);
+	if (value_str)
+	{
+		value = atoi(value_str);
+	}
+
+	return value;
+}
+
+
 AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
 axis2_amqp_util_get_soap_envelope(
-	axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+	axis2_amqp_response_t* response,
 	const axutil_env_t* env,
 	axis2_msg_ctx_t* msg_ctx)
 {
@@ -83,7 +126,7 @@
 	axutil_hash_t *binary_data_map = NULL;
 	axis2_bool_t is_soap_11 = AXIS2_FALSE;
 
-	if (!binary_data_buffer || !binary_data_buffer->data || !binary_data_buffer->content_type)
+	if (!response || !response->data || !response->content_type)
 	{
 		return NULL;
 	}
@@ -91,10 +134,10 @@
 	is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
 
 	/* Handle MTOM */
-	if (strstr(binary_data_buffer->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
+	if (strstr(response->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
 	{
 		axis2_char_t* mime_boundary = axis2_amqp_util_get_value_from_content_type(env,
-				binary_data_buffer->content_type,
+				response->content_type,
 				AXIS2_AMQP_HEADER_CONTENT_TYPE_MIME_BOUNDARY);
 		
 		if (mime_boundary)
@@ -165,12 +208,12 @@
 
             	if (stream)
             	{
-					axutil_stream_write(stream, env, binary_data_buffer->data, 
-							binary_data_buffer->length);
+					axutil_stream_write(stream, env, response->data, 
+							response->length);
                 	callback_ctx->env = env;
                 	callback_ctx->in_stream = stream;
-                	callback_ctx->content_length = binary_data_buffer->length;
-                	callback_ctx->unread_len = binary_data_buffer->length;
+                	callback_ctx->content_length = response->length;
+                	callback_ctx->unread_len = response->length;
                 	callback_ctx->chunked_stream = NULL;
             	}
 
@@ -203,8 +246,8 @@
 	}
 	else
 	{
-		soap_body_str = binary_data_buffer->data;
-		soap_body_len = axutil_strlen(binary_data_buffer->data);
+		soap_body_str = response->data;
+		soap_body_len = axutil_strlen(response->data);
 	}
 
 	soap_body_len = axutil_strlen(soap_body_str);
@@ -403,7 +446,7 @@
 	axutil_property_t* property = NULL;
 	axis2_char_t* queue_name = NULL;
 	axis2_char_t* value = NULL;
-	
+
 	/* Get property */
 	property = axis2_conf_ctx_get_property(conf_ctx, env, 
 			AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME);
@@ -429,7 +472,7 @@
 			AXIS2_MALLOC(env->allocator, axutil_strlen(value) + 1);
 		strcpy(queue_name, value);
 
-		axutil_property_set_value(property, env, NULL);
+		/*axutil_property_set_value(property, env, NULL);*/
 	}
 	else
 	{
@@ -451,15 +494,21 @@
 	const axutil_env_t* env)
 {
 	axutil_property_t* property = NULL;
-	axis2_char_t* broker_ip = NULL;
+	void* value = NULL;
+	axis2_char_t* broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
 
 	property = axis2_conf_ctx_get_property(conf_ctx, env, 
 			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP);
 
-	if (!property)
-		return NULL;
+	if (property)
+	{
+		value = axutil_property_get_value(property, env);
 
-	broker_ip = (axis2_char_t*)axutil_property_get_value(property, env);
+		if (value)
+		{
+			broker_ip = (axis2_char_t*)value;
+		}
+	}
 
 	return broker_ip;
 }
@@ -472,7 +521,7 @@
 {
 	axutil_property_t* property = NULL;
 	void* value = NULL;
-	int broker_port = 0;
+	int broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
 
 	property = axis2_conf_ctx_get_property(conf_ctx, env, 
 			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT);
@@ -505,7 +554,7 @@
 	if (property)
 	{
 		value = (axis2_char_t*)axutil_property_get_value(property, env);
-		
+	
 		if (value && (axutil_strcmp(AXIS2_VALUE_TRUE, value) == 0))
 		{
 			use_separate_listener = AXIS2_TRUE;
@@ -528,83 +577,85 @@
 	 * 3. TempQueue... */
 
 	axis2_endpoint_ref_t* endpoint_ref = NULL;
-	const axis2_char_t* endpoint_address_original = NULL;
-	axis2_char_t* endpoint_address = NULL;
 	axis2_amqp_destination_info_t* destination_info = NULL;
-	char* substr = NULL;
-	char* token = NULL;
-
-	endpoint_ref = axis2_msg_ctx_get_to(msg_ctx, env);
-	if (!endpoint_ref)
-		return NULL;
-
-	endpoint_address_original = axis2_endpoint_ref_get_address(endpoint_ref, env);
-	if (!endpoint_address_original)
-		return NULL;
-
-	endpoint_address = (axis2_char_t*)AXIS2_MALLOC(env->allocator, 
-			(sizeof(axis2_char_t) * axutil_strlen(endpoint_address_original)) + 1);
-	strcpy((char*)endpoint_address, (char*)endpoint_address_original);
 
 	destination_info = (axis2_amqp_destination_info_t*)
 		AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_destination_info_t));
 
 	destination_info->broker_ip = NULL;
-	destination_info->broker_port = AXIS2_QPID_NULL_BROKER_PORT;
+	destination_info->broker_port = AXIS2_QPID_NULL_CONF_INT;
 	destination_info->queue_name = NULL;
-
-	if ((substr = strstr(endpoint_address, AXIS2_AMQP_EPR_PREFIX))) /* Start with amqp: */ 
+	
+	endpoint_ref = axis2_msg_ctx_get_to(msg_ctx, env);
+	
+	if (endpoint_ref)
 	{
-		if (strstr(endpoint_address, AXIS2_AMQP_EPR_ANON_SERVICE_NAME))
-		{
-			/* Server reply to dual-channel client */
-			axutil_property_t* property = NULL;
-            property = axis2_msg_ctx_get_property(msg_ctx, env,
-                    AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+		const axis2_char_t* endpoint_address_original = NULL;
+		axis2_char_t* endpoint_address = NULL;
+		char* substr = NULL;
+		char* token = NULL;
+		endpoint_address_original = axis2_endpoint_ref_get_address(endpoint_ref, env);
+	
+		if (!endpoint_address_original)
+			return NULL;
 
-            if (property)
-            {
-                axis2_char_t* queue_name = (axis2_char_t*)
-                    axutil_property_get_value(property, env);
+		endpoint_address = (axis2_char_t*)AXIS2_MALLOC(env->allocator, 
+				(sizeof(axis2_char_t) * axutil_strlen(endpoint_address_original)) + 1);
+		strcpy((char*)endpoint_address, (char*)endpoint_address_original);
+		
+		if ((substr = strstr(endpoint_address, AXIS2_AMQP_EPR_PREFIX))) /* Start with amqp: */
+		{
+			if (strstr(endpoint_address, AXIS2_AMQP_EPR_ANON_SERVICE_NAME))
+			{
+				/* Server reply to dual-channel client */
+				axutil_property_t* property = NULL;
+				property = axis2_msg_ctx_get_property(msg_ctx, env,
+						AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
 
-				if (queue_name)
+				if (property)
 				{
-					destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
-							env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
-					strcpy(destination_info->queue_name, queue_name);
+					axis2_char_t* queue_name = (axis2_char_t*)
+						axutil_property_get_value(property, env);
+
+					if (queue_name)
+					{
+						destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+								env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+						strcpy(destination_info->queue_name, queue_name);
+					}
 				}
-            }
-		}
-		else
-		{
-			substr+= strlen(AXIS2_AMQP_EPR_PREFIX) + 2; /* 2 -> "//" */
-			if (substr) /* IP:PORT/services/SERVICE_NAME */
+			}
+			else
 			{
-				token = strtok(substr, ":");
-				if (token) /* IP */
+				substr+= strlen(AXIS2_AMQP_EPR_PREFIX) + 2; /* 2 -> "//" */
+				if (substr) /* IP:PORT/services/SERVICE_NAME */
 				{
-					axis2_char_t* broker_ip = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
-							(sizeof(axis2_char_t) * strlen(token)) + 1);
-					strcpy(broker_ip, token);
-					destination_info->broker_ip = broker_ip;
-
-					token = strtok(NULL, "/"); /* PORT */
-					if (token)
+					token = strtok(substr, ":");
+					if (token) /* IP */
 					{
-						destination_info->broker_port = atoi(token);
+						axis2_char_t* broker_ip = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+								(sizeof(axis2_char_t) * strlen(token)) + 1);
+						strcpy(broker_ip, token);
+						destination_info->broker_ip = broker_ip;
 
-						token = strtok(NULL, "#"); /* ... services/SERVICE_NAME */
+						token = strtok(NULL, "/"); /* PORT */
 						if (token)
 						{
-							if ((substr = strstr(token, AXIS2_AMQP_EPR_SERVICE_PREFIX)))
+							destination_info->broker_port = atoi(token);
+
+							token = strtok(NULL, "#"); /* ... services/SERVICE_NAME */
+							if (token)
 							{
-								substr+= strlen(AXIS2_AMQP_EPR_SERVICE_PREFIX) + 1; /* 1 -> "/" */ 
-								if (substr)
+								if ((substr = strstr(token, AXIS2_AMQP_EPR_SERVICE_PREFIX)))
 								{
-									axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
-											(sizeof(axis2_char_t) * strlen(substr)) + 1);
-									strcpy(queue_name, substr);
-									destination_info->queue_name = queue_name;
+									substr+= strlen(AXIS2_AMQP_EPR_SERVICE_PREFIX) + 1; /* 1 -> "/" */
+									if (substr)
+									{
+										axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+												(sizeof(axis2_char_t) * strlen(substr)) + 1);
+										strcpy(queue_name, substr);
+										destination_info->queue_name = queue_name;
+									}
 								}
 							}
 						}
@@ -612,10 +663,36 @@
 				}
 			}
 		}
+		else if (0 == strcmp(endpoint_address, AXIS2_WSA_ANONYMOUS_URL)) /* Required to work with Sandesha2 */
+		{
+			axutil_property_t* property = NULL;
+			property = axis2_msg_ctx_get_property(msg_ctx, env, 
+					AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+		
+			if (property)
+			{
+				axis2_char_t* queue_name = (axis2_char_t*)
+					axutil_property_get_value(property, env);
+			
+				if (queue_name)
+				{
+					destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+							env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+					strcpy(destination_info->queue_name, queue_name);
+				}
+			}
+		}
+		else if ((substr = strstr(endpoint_address, "jms:/")) &&
+			 	 (substr == endpoint_address))
+		{
+
+		}
+		
+		AXIS2_FREE(env->allocator, endpoint_address);
 	}
-	else if (0 == strcmp(endpoint_address, AXIS2_WSA_ANONYMOUS_URL)) /* Required to work with Sandesha2  */
+	else
 	{
-		/* Server reply to dual-channel client */
+		/* Single-channel blocking */
 		axutil_property_t* property = NULL;
 		property = axis2_msg_ctx_get_property(msg_ctx, env, 
 				AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
@@ -624,7 +701,7 @@
 		{
 			axis2_char_t* queue_name = (axis2_char_t*)
 				axutil_property_get_value(property, env);
-
+			
 			if (queue_name)
 			{
 				destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
@@ -633,25 +710,6 @@
 			}
 		}
 	}
-	else if ((substr = strstr(endpoint_address, "jms:/")) &&
-			 (substr == endpoint_address))
-	{
-
-	}
-	else if ((substr = strstr(endpoint_address, "TempQueue")) &&
-			 (substr == endpoint_address))
-	{
-		axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator, 
-				(sizeof(axis2_char_t) * strlen(endpoint_address)) + 1);
-		strcpy(queue_name, endpoint_address);
-		destination_info->queue_name = queue_name;
-	}
-	else
-	{
-		AXIS2_FREE(env->allocator, destination_info);
-		AXIS2_FREE(env->allocator, endpoint_address);
-		return NULL;
-	}
 
 	/* Get broker IP/Port from conf_ctx if they are not 
 	 * found in the destination URI */
@@ -682,7 +740,7 @@
 		}
 	}
 
-	if (AXIS2_QPID_NULL_BROKER_PORT == destination_info->broker_port)
+	if (AXIS2_QPID_NULL_CONF_INT == destination_info->broker_port)
 	{
 		axis2_conf_ctx_t* conf_ctx = NULL;
 
@@ -705,7 +763,103 @@
 		}
 	}
 
-	AXIS2_FREE(env->allocator, endpoint_address);
-
 	return destination_info;
 }
+
+
+AXIS2_EXTERN int AXIS2_CALL 
+axis2_amqp_util_msg_ctx_get_request_timeout(
+	axis2_msg_ctx_t* msg_ctx,
+	const axutil_env_t* env)
+{
+	axis2_conf_ctx_t* conf_ctx = NULL;
+	axutil_property_t* property = NULL;
+	void* value = NULL;
+	int request_timeout = AXIS2_QPID_DEFAULT_REQUEST_TIMEOUT;
+
+	conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+
+	if (conf_ctx)
+	{
+		property = axis2_conf_ctx_get_property(conf_ctx, env, 
+				AXIS2_AMQP_CONF_CTX_PROPERTY_REQUEST_TIMEOUT);
+		
+		if (property)
+		{
+			value = axutil_property_get_value(property, env);
+			
+			if (value)
+			{
+				request_timeout = *(int*)value;
+			}
+		}
+	}
+
+	return request_timeout;
+}
+
+
+AXIS2_EXTERN axis2_bool_t AXIS2_CALL 
+axis2_amqp_util_msg_ctx_get_server_side(
+	axis2_msg_ctx_t* msg_ctx,
+	const axutil_env_t* env)
+{
+	axis2_conf_ctx_t* conf_ctx = NULL;
+	axis2_bool_t is_server = AXIS2_FALSE;
+
+	conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+
+	if (conf_ctx)
+	{
+		is_server = 
+			axis2_amqp_util_conf_ctx_get_server_side(
+					conf_ctx, env);
+	}
+
+	return is_server;
+}
+
+
+AXIS2_EXTERN void AXIS2_CALL 
+axis2_amqp_response_free(
+	axis2_amqp_response_t* response,
+	const axutil_env_t* env)
+{
+	if (response)
+	{
+		if (response->data)
+		{
+			AXIS2_FREE(env->allocator, response->data);
+		}
+
+		if (response->content_type)
+		{
+			AXIS2_FREE(env->allocator, response->content_type);
+		}
+
+		AXIS2_FREE(env->allocator, response);
+	}
+}
+
+
+AXIS2_EXTERN void AXIS2_CALL 
+axis2_amqp_destination_info_free(
+	axis2_amqp_destination_info_t* destination_info,
+	const axutil_env_t* env)
+{
+	if (destination_info)
+	{
+		if (destination_info->broker_ip)
+		{
+			AXIS2_FREE(env->allocator, destination_info->broker_ip);
+		}
+
+		if (destination_info->queue_name)
+		{
+			AXIS2_FREE(env->allocator, destination_info->queue_name);
+		}
+		
+		AXIS2_FREE(env->allocator, destination_info);
+	}
+}
+

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h?rev=721090&r1=721089&r2=721090&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h Wed Nov 26 22:24:07 2008
@@ -30,12 +30,12 @@
 {
 #endif
 
-	typedef struct axis2_amqp_binary_data_buffer
+	typedef struct axis2_amqp_response
 	{
 		void* data;
 		int length;
 		axis2_char_t* content_type;
-	} axis2_amqp_binary_data_buffer_t;
+	} axis2_amqp_response_t;
 
 	typedef struct axis2_amqp_destination_info
 	{
@@ -45,20 +45,32 @@
 	} axis2_amqp_destination_info_t;
 
 	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
-	axis2_amqp_util_get_conf_value_string(
+	axis2_amqp_util_get_in_desc_conf_value_string(
 		axis2_transport_in_desc_t* in_desc,
 		const axutil_env_t* env,
 		const axis2_char_t* param_name);
 
 	AXIS2_EXTERN int AXIS2_CALL
-	axis2_amqp_util_get_conf_value_int(
+	axis2_amqp_util_get_in_desc_conf_value_int(
 		axis2_transport_in_desc_t* in_desc,
 		const axutil_env_t* env,
 		const axis2_char_t* param_name);
 
+	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+	axis2_amqp_util_get_out_desc_conf_value_string(
+		axis2_transport_out_desc_t* out_desc,
+		const axutil_env_t* env,
+		const axis2_char_t* param_name);
+
+	AXIS2_EXTERN int AXIS2_CALL
+	axis2_amqp_util_get_out_desc_conf_value_int(
+		axis2_transport_out_desc_t* out_desc,
+		const axutil_env_t* env,
+		const axis2_char_t* param_name);
+
 	AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
 	axis2_amqp_util_get_soap_envelope(
-		axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+		axis2_amqp_response_t* response,
 		const axutil_env_t* env,
 		axis2_msg_ctx_t* msg_ctx);
 
@@ -104,6 +116,26 @@
 		axis2_msg_ctx_t* msg_ctx,
 		const axutil_env_t* env);
 
+	AXIS2_EXTERN int AXIS2_CALL 
+	axis2_amqp_util_msg_ctx_get_request_timeout(
+		axis2_msg_ctx_t* msg_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN axis2_bool_t AXIS2_CALL 
+	axis2_amqp_util_msg_ctx_get_server_side(
+		axis2_msg_ctx_t* msg_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN void AXIS2_CALL 
+	axis2_amqp_response_free(
+		axis2_amqp_response_t* response,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN void AXIS2_CALL 
+	axis2_amqp_destination_info_free(
+		axis2_amqp_destination_info_t* destination_info,
+		const axutil_env_t* env);
+
 #ifdef __cplusplus
 }
 #endif