You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by sh...@apache.org on 2008/11/20 07:28:38 UTC

svn commit: r719179 [3/3] - in /webservices/axis2/trunk/c: samples/ samples/client/amqp/ samples/client/amqp/echo/ samples/client/amqp/mtom/ samples/client/amqp/mtom/resources/ samples/client/amqp/notify/ src/core/transport/amqp/receiver/ src/core/tran...

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.c Wed Nov 19 22:28:35 2008
@@ -16,129 +16,696 @@
  */
 
 #include <axiom.h>
+#include <axiom_mime_parser.h>
+#include <axis2_util.h>
+#include <axis2_addr.h>
+#include <axutil_http_chunked_stream.h>
+#include <axis2_amqp_defines.h>
 #include <axis2_amqp_util.h>
 
-axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_conf_value_string (axis2_transport_in_desc_t* in_desc,
-						  	           const axutil_env_t* 	   	  env,
-						               const axis2_char_t*	 	  param_name)
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+axis2_amqp_util_get_conf_value_string(
+	axis2_transport_in_desc_t* in_desc,
+	const axutil_env_t* env,
+	const axis2_char_t* param_name)
 {
 	axutil_param_t* param = NULL;
-    axis2_char_t* 	value = NULL;
+    axis2_char_t* value = NULL;
 
 	param = (axutil_param_t*)
-			axutil_param_container_get_param (axis2_transport_in_desc_param_container (in_desc, env),
-											  env,
-											  param_name);
+			axutil_param_container_get_param(
+				axis2_transport_in_desc_param_container(in_desc, env),
+				env,
+				param_name);
     if (param)
     {
-		value = axutil_param_get_value (param, env);
+		value = axutil_param_get_value(param, env);
     }
 
 	return value;
 }
 
 
-int AXIS2_CALL
-axis2_amqp_util_get_conf_value_int (axis2_transport_in_desc_t* in_desc,
-						            const axutil_env_t* 	   env,
-						            const axis2_char_t*		   param_name)
+AXIS2_EXTERN int AXIS2_CALL
+axis2_amqp_util_get_conf_value_int(
+	axis2_transport_in_desc_t* in_desc,
+	const axutil_env_t* env,
+	const axis2_char_t* param_name)
 {
 	axis2_char_t* value_str = NULL;
-	int 		  value = 0;
+	int value = 0;
 
-	value_str = axis2_amqp_util_get_conf_value_string (in_desc, env, param_name);
+	value_str = axis2_amqp_util_get_conf_value_string(
+			in_desc, env, param_name);
 	if (value_str)
 	{
-		value = atoi (value_str);
+		value = atoi(value_str);
 	}
 
 	return value;
 }
 
 
-axiom_soap_envelope_t* AXIS2_CALL
-axis2_amqp_util_get_soap_envelope (const axis2_char_t* content,
-								   const axutil_env_t* env)
+AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
+axis2_amqp_util_get_soap_envelope(
+	axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+	const axutil_env_t* env,
+	axis2_msg_ctx_t* msg_ctx)
 {
 	axiom_xml_reader_t* xml_reader = NULL;
 	axiom_stax_builder_t* stax_builder = NULL;
 	axiom_soap_builder_t* soap_builder = NULL;
 	axiom_soap_envelope_t* soap_envelope = NULL;
+	const axis2_char_t* soap_ns_uri = NULL;
+	axis2_char_t *soap_body_str = NULL;
+	int soap_body_len = 0;
+	axis2_bool_t is_mtom = AXIS2_FALSE;
+	axutil_hash_t *binary_data_map = NULL;
+	axis2_bool_t is_soap_11 = AXIS2_FALSE;
 
-	xml_reader = axiom_xml_reader_create_for_memory (env,
-                                                     (axis2_char_t*)content,
-                                                     axutil_strlen (content),
-                                                     NULL,
-                                                     AXIS2_XML_PARSER_TYPE_BUFFER);
+	if (!binary_data_buffer || !binary_data_buffer->data || !binary_data_buffer->content_type)
+	{
+		return NULL;
+	}
+
+	is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+
+	/* Handle MTOM */
+	if (strstr(binary_data_buffer->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
+	{
+		axis2_char_t* mime_boundary = axis2_amqp_util_get_value_from_content_type(env,
+				binary_data_buffer->content_type,
+				AXIS2_AMQP_HEADER_CONTENT_TYPE_MIME_BOUNDARY);
+		
+		if (mime_boundary)
+		{
+			axiom_mime_parser_t *mime_parser = NULL;
+			int soap_body_len = 0;
+			axutil_param_t *buffer_size_param = NULL;
+			axutil_param_t *max_buffers_param = NULL;
+			axutil_param_t *attachment_dir_param = NULL;
+			axis2_char_t *value_size = NULL;
+			axis2_char_t *value_num = NULL;
+			axis2_char_t *value_dir = NULL;
+			int size = 0;
+			int num = 0;
+			
+			mime_parser = axiom_mime_parser_create(env);
+			
+			buffer_size_param = axis2_msg_ctx_get_parameter(msg_ctx, 
+					env, AXIS2_MTOM_BUFFER_SIZE);
+			
+			if (buffer_size_param)
+			{
+				value_size = (axis2_char_t *)axutil_param_get_value(buffer_size_param, env);
+				
+				if (value_size)
+				{
+					size = atoi(value_size);
+					axiom_mime_parser_set_buffer_size(mime_parser, env, size);
+				}
+			}
+
+            max_buffers_param = axis2_msg_ctx_get_parameter(msg_ctx, 
+					env, AXIS2_MTOM_MAX_BUFFERS);
+
+            if (max_buffers_param)
+            {
+                value_num = (axis2_char_t*)axutil_param_get_value(max_buffers_param, env);
+
+                if(value_num)
+                {
+                    num = atoi(value_num);
+                    axiom_mime_parser_set_max_buffers(mime_parser, env, num);
+                }
+            }
+
+            attachment_dir_param = axis2_msg_ctx_get_parameter(msg_ctx,
+					env, AXIS2_ATTACHMENT_DIR);
+
+            if (attachment_dir_param)
+            {
+                value_dir = (axis2_char_t*)axutil_param_get_value(attachment_dir_param, env);
+
+                if (value_dir)
+                {
+                    axiom_mime_parser_set_attachment_dir(mime_parser, env, value_dir);
+                }
+            }
+
+            if (mime_parser)
+            {
+				axis2_callback_info_t* callback_ctx = NULL;
+				axutil_stream_t* stream = NULL;
+
+				callback_ctx = (axis2_callback_info_t*)
+					AXIS2_MALLOC(env->allocator, sizeof(axis2_callback_info_t));
+
+            	stream = axutil_stream_create_basic(env);
+
+            	if (stream)
+            	{
+					axutil_stream_write(stream, env, binary_data_buffer->data, 
+							binary_data_buffer->length);
+                	callback_ctx->env = env;
+                	callback_ctx->in_stream = stream;
+                	callback_ctx->content_length = binary_data_buffer->length;
+                	callback_ctx->unread_len = binary_data_buffer->length;
+                	callback_ctx->chunked_stream = NULL;
+            	}
+
+                binary_data_map = 
+                    axiom_mime_parser_parse(mime_parser, env,
+                                            axis2_amqp_util_on_data_request,
+                                            (void*)callback_ctx,
+                                            mime_boundary);
+
+                if (!binary_data_map)
+                {
+                    return AXIS2_FAILURE;
+                }
+
+				soap_body_len =
+                    axiom_mime_parser_get_soap_body_len(mime_parser, env);
+
+                soap_body_str =
+                    axiom_mime_parser_get_soap_body_str(mime_parser, env);
+
+				axutil_stream_free(stream, env);
+				AXIS2_FREE(env->allocator, callback_ctx);
+				axiom_mime_parser_free(mime_parser, env);
+            }
+
+			AXIS2_FREE(env->allocator, mime_boundary);
+		}
+
+		is_mtom = AXIS2_TRUE;
+	}
+	else
+	{
+		soap_body_str = binary_data_buffer->data;
+		soap_body_len = axutil_strlen(binary_data_buffer->data);
+	}
+
+	soap_body_len = axutil_strlen(soap_body_str);
+
+	xml_reader = axiom_xml_reader_create_for_memory(env, soap_body_str,
+			soap_body_len, NULL, AXIS2_XML_PARSER_TYPE_BUFFER);
     if (!xml_reader)
     {
-        AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create XML Reader");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create XML Reader");
         return NULL;
     }
 
-    stax_builder = axiom_stax_builder_create (env, xml_reader);
+    stax_builder = axiom_stax_builder_create(env, xml_reader);
     if (!stax_builder)
     {
-        AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create StAX Builder");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create StAX Builder");
         return NULL;
     }
 
-    soap_builder = axiom_soap_builder_create (env,
-                                              stax_builder,
-                                              AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI);
+	soap_ns_uri = is_soap_11 ? AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI : 
+		AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
+
+    soap_builder = axiom_soap_builder_create(env, stax_builder, soap_ns_uri);
     if (!soap_builder)
     {
-        AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create SOAP Builder");
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create SOAP Builder");
         return NULL;
     }
-	
-	soap_envelope = axiom_soap_builder_get_soap_envelope (soap_builder, env);
 
+	if (binary_data_map)
+	{
+		axiom_soap_builder_set_mime_body_parts(soap_builder, env, binary_data_map);
+	}
+
+	soap_envelope = axiom_soap_builder_get_soap_envelope(soap_builder, env);
+	
+	if (soap_envelope)
+	{
+		/* hack to get around MTOM problem */
+		axiom_soap_body_t *soap_body = 
+			axiom_soap_envelope_get_body(soap_envelope, env);
+		
+		if (soap_body)
+		{
+			axiom_soap_body_has_fault(soap_body, env);
+		}
+	}
+	
 	return soap_envelope;
 }
 
 
-axutil_url_t* AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_url (axis2_msg_ctx_t*    msg_ctx,
-									 const axutil_env_t* env)
+AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+axis2_amqp_util_conf_ctx_get_server_side(
+	axis2_conf_ctx_t* conf_ctx,
+	const axutil_env_t* env)
 {
-	axis2_endpoint_ref_t* endpoint_ref = NULL;
-	const axis2_char_t* endpoint_ref_str = NULL;
+	axutil_property_t* property = NULL;
+	axis2_char_t* value = NULL;
+	
+	property = axis2_conf_ctx_get_property(conf_ctx, env, AXIS2_IS_SVR_SIDE);
+	if (!property)
+		return AXIS2_TRUE;
+
+	value = (axis2_char_t*)axutil_property_get_value(property, env);
+	if (!value)
+		return AXIS2_TRUE;
 
-	endpoint_ref = axis2_msg_ctx_get_to (msg_ctx, env);
-	if (!endpoint_ref)
-		return NULL;
+	return (axutil_strcmp(value, AXIS2_VALUE_TRUE) == 0) ? AXIS2_TRUE : AXIS2_FALSE;
+}
+
+AXIS2_EXTERN axis2_char_t *AXIS2_CALL
+axis2_amqp_util_get_value_from_content_type(
+    const axutil_env_t * env,
+    const axis2_char_t * content_type,
+    const axis2_char_t * key)
+{
+    axis2_char_t *tmp = NULL;
+    axis2_char_t *tmp_content_type = NULL;
+    axis2_char_t *tmp2 = NULL;
+
+    AXIS2_PARAM_CHECK(env->error, content_type, NULL);
+    AXIS2_PARAM_CHECK(env->error, key, NULL);
+
+    tmp_content_type = axutil_strdup(env, content_type);
+
+    if (!tmp_content_type)
+    {
+        return NULL;
+    }
+
+    tmp = strstr(tmp_content_type, key);
+
+    if (!tmp)
+    {
+        AXIS2_FREE(env->allocator, tmp_content_type);
+        return NULL;
+    }
+
+    tmp = strchr(tmp, AXIS2_AMQP_EQ);
+    tmp2 = strchr(tmp, AXIS2_AMQP_SEMI_COLON);
+
+    if (tmp2)
+    {
+        *tmp2 = AXIS2_AMQP_ESC_NULL;
+    }
+
+    if (!tmp)
+    {
+        AXIS2_FREE(env->allocator, tmp_content_type);
+        return NULL;
+    }
+
+    tmp2 = axutil_strdup(env, tmp + 1);
+
+    AXIS2_FREE(env->allocator, tmp_content_type);
+
+    if (*tmp2 == AXIS2_AMQP_DOUBLE_QUOTE)
+    {
+        tmp = tmp2;
+        tmp2 = axutil_strdup(env, tmp + 1);
+        tmp2[strlen(tmp2) - 1] = AXIS2_AMQP_ESC_NULL;
+        
+	    if(tmp)
+        {
+            AXIS2_FREE(env->allocator, tmp); 
+            tmp = NULL;
+        }
+    }
+
+    /* handle XOP */
+    if(*tmp2 == AXIS2_AMQP_B_SLASH && *(tmp2 + 1) == '\"')
+    {
+        tmp = tmp2;
+        tmp2 = axutil_strdup(env, tmp + 2);
+        tmp2[strlen(tmp2) - 3] = AXIS2_AMQP_ESC_NULL;
+
+        if(tmp)
+        {
+            AXIS2_FREE(env->allocator, tmp);
+            tmp = NULL;
+        }
+    }
+
+    return tmp2;
+}
+
+
+AXIS2_EXTERN int AXIS2_CALL
+axis2_amqp_util_on_data_request(
+    char* buffer,
+    int size,
+    void* ctx)
+{
+    const axutil_env_t* env = NULL;
+    int len = -1;
+    axis2_callback_info_t* cb_ctx = (axis2_callback_info_t*)ctx;
+    axutil_stream_t* in_stream = NULL;
+
+    if (!buffer || !ctx)
+    {
+        return 0;
+    }
+
+	if (cb_ctx->unread_len <= 0 && -1 != cb_ctx->content_length)
+    {
+        return 0;
+    }
+
+	env = ((axis2_callback_info_t*)ctx)->env;
+
+    in_stream = (axutil_stream_t*)((axis2_callback_info_t *) ctx)->in_stream;
+    --size;                         /* reserve space to insert trailing null */
+
+    len = axutil_stream_read(in_stream, env, buffer, size);
+
+	if (len > 0)
+	{
+		buffer[len] = AXIS2_AMQP_ESC_NULL;
+		((axis2_callback_info_t*)ctx)->unread_len -= len;
+    }
+    else if(len == 0)
+    {
+		((axis2_callback_info_t*)ctx)->unread_len = 0;
+	}
+
+    return len;
+}
+
+
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL 
+axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(
+	axis2_conf_ctx_t* conf_ctx,
+	const axutil_env_t* env)
+{
+	axutil_property_t* property = NULL;
+	axis2_char_t* queue_name = NULL;
+	axis2_char_t* value = NULL;
 	
-	endpoint_ref_str = axis2_endpoint_ref_get_address (endpoint_ref, env);
-	if (!endpoint_ref_str)
-		return NULL;
+	/* Get property */
+	property = axis2_conf_ctx_get_property(conf_ctx, env, 
+			AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME);
+	if (!property) /* Very first call */
+	{
+		property = axutil_property_create(env);
+		
+		axis2_conf_ctx_set_property(conf_ctx, env, 
+				AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME, property);
+	}
+
+	/* Get queue name */
+	value = (axis2_char_t*)axutil_property_get_value(property, env);
 	
-	return axutil_url_parse_string (env, endpoint_ref_str);
+	/* AMQP listener and the sender are the two parties that are 
+	 * interested in the queue. Either party can create the queue.
+	 * If the queue is already created by one party, "value" is 
+	 * not NULL. If "value" is NULL, that mean the caller of 
+	 * this method is supposed to create the queue */
+	if (value)
+	{
+		queue_name = (axis2_char_t*)
+			AXIS2_MALLOC(env->allocator, axutil_strlen(value) + 1);
+		strcpy(queue_name, value);
+
+		axutil_property_set_value(property, env, NULL);
+	}
+	else
+	{
+		/* Create new queue name */
+		queue_name = axutil_stracat(env, AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX, 
+				axutil_uuid_gen(env));
+		
+		/* Put queue name in the conf_ctx so that the sender will know */
+		axutil_property_set_value(property, env, (void*)queue_name);
+	}
+
+	return queue_name;
 }
 
 
-axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_ip (axis2_msg_ctx_t*    msg_ctx,
-		                            const axutil_env_t* env)
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL 
+axis2_amqp_util_conf_ctx_get_qpid_broker_ip(
+	axis2_conf_ctx_t* conf_ctx,
+	const axutil_env_t* env)
 {
-	axutil_url_t* endpoint_url = axis2_amqp_util_get_qpid_broker_url (msg_ctx, env);
+	axutil_property_t* property = NULL;
+	axis2_char_t* broker_ip = NULL;
+
+	property = axis2_conf_ctx_get_property(conf_ctx, env, 
+			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP);
 
-	if (!endpoint_url)
+	if (!property)
 		return NULL;
 
-	return axutil_url_get_host (endpoint_url, env);
+	broker_ip = (axis2_char_t*)axutil_property_get_value(property, env);
+
+	return broker_ip;
+}
+
+
+AXIS2_EXTERN int AXIS2_CALL 
+axis2_amqp_util_conf_ctx_get_qpid_broker_port(
+	axis2_conf_ctx_t* conf_ctx,
+	const axutil_env_t* env)
+{
+	axutil_property_t* property = NULL;
+	void* value = NULL;
+	int broker_port = 0;
+
+	property = axis2_conf_ctx_get_property(conf_ctx, env, 
+			AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT);
+
+	if (property)
+	{
+		value = axutil_property_get_value(property, env);
+
+		if (value)
+		{
+			broker_port = *(int*)value;
+		}
+	}
+
+	return broker_port;
+}
+
+
+AXIS2_EXTERN axis2_bool_t AXIS2_CALL 
+axis2_amqp_util_msg_ctx_get_use_separate_listener(
+	axis2_msg_ctx_t* msg_ctx,
+	const axutil_env_t* env)
+{
+	axutil_property_t* property = NULL;
+	axis2_char_t* value = NULL;
+	axis2_bool_t use_separate_listener = AXIS2_FALSE;
+
+	property = axis2_msg_ctx_get_property(msg_ctx, env, AXIS2_USE_SEPARATE_LISTENER);
+	
+	if (property)
+	{
+		value = (axis2_char_t*)axutil_property_get_value(property, env);
+		
+		if (value && (axutil_strcmp(AXIS2_VALUE_TRUE, value) == 0))
+		{
+			use_separate_listener = AXIS2_TRUE;
+		}
+	}
+	
+	return use_separate_listener;
 }
 
 
-int AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_port (axis2_msg_ctx_t*    msg_ctx,
-		                              const axutil_env_t* env)
+AXIS2_EXTERN axis2_amqp_destination_info_t* AXIS2_CALL 
+axis2_amqp_util_msg_ctx_get_destination_info(
+	axis2_msg_ctx_t* msg_ctx,
+	const axutil_env_t* env)
 {
-	axutil_url_t* endpoint_url = axis2_amqp_util_get_qpid_broker_url (msg_ctx, env);
+	/* The destination URI that is expected by this method 
+	 * should be of one of the following formats 
+	 * 1. amqp://IP:PORT/services/SERVICE_NAME 
+	 * 2. jms:/SERVICE_NAME?java.naming.provider.url=tcp://IP:PORT...
+	 * 3. TempQueue... */
+
+	axis2_endpoint_ref_t* endpoint_ref = NULL;
+	const axis2_char_t* endpoint_address_original = NULL;
+	axis2_char_t* endpoint_address = NULL;
+	axis2_amqp_destination_info_t* destination_info = NULL;
+	char* substr = NULL;
+	char* token = NULL;
+
+	endpoint_ref = axis2_msg_ctx_get_to(msg_ctx, env);
+	if (!endpoint_ref)
+		return NULL;
+
+	endpoint_address_original = axis2_endpoint_ref_get_address(endpoint_ref, env);
+	if (!endpoint_address_original)
+		return NULL;
+
+	endpoint_address = (axis2_char_t*)AXIS2_MALLOC(env->allocator, 
+			(sizeof(axis2_char_t) * axutil_strlen(endpoint_address_original)) + 1);
+	strcpy((char*)endpoint_address, (char*)endpoint_address_original);
+
+	destination_info = (axis2_amqp_destination_info_t*)
+		AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_destination_info_t));
+
+	destination_info->broker_ip = NULL;
+	destination_info->broker_port = AXIS2_QPID_NULL_BROKER_PORT;
+	destination_info->queue_name = NULL;
+
+	if ((substr = strstr(endpoint_address, AXIS2_AMQP_EPR_PREFIX))) /* Start with amqp: */ 
+	{
+		if (strstr(endpoint_address, AXIS2_AMQP_EPR_ANON_SERVICE_NAME))
+		{
+			/* Server reply to dual-channel client */
+			axutil_property_t* property = NULL;
+            property = axis2_msg_ctx_get_property(msg_ctx, env,
+                    AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+
+            if (property)
+            {
+                axis2_char_t* queue_name = (axis2_char_t*)
+                    axutil_property_get_value(property, env);
+
+				if (queue_name)
+				{
+					destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+							env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+					strcpy(destination_info->queue_name, queue_name);
+				}
+            }
+		}
+		else
+		{
+			substr+= strlen(AXIS2_AMQP_EPR_PREFIX) + 2; /* 2 -> "//" */
+			if (substr) /* IP:PORT/services/SERVICE_NAME */
+			{
+				token = strtok(substr, ":");
+				if (token) /* IP */
+				{
+					axis2_char_t* broker_ip = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+							(sizeof(axis2_char_t) * strlen(token)) + 1);
+					strcpy(broker_ip, token);
+					destination_info->broker_ip = broker_ip;
+
+					token = strtok(NULL, "/"); /* PORT */
+					if (token)
+					{
+						destination_info->broker_port = atoi(token);
+
+						token = strtok(NULL, "#"); /* ... services/SERVICE_NAME */
+						if (token)
+						{
+							if ((substr = strstr(token, AXIS2_AMQP_EPR_SERVICE_PREFIX)))
+							{
+								substr+= strlen(AXIS2_AMQP_EPR_SERVICE_PREFIX) + 1; /* 1 -> "/" */ 
+								if (substr)
+								{
+									axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+											(sizeof(axis2_char_t) * strlen(substr)) + 1);
+									strcpy(queue_name, substr);
+									destination_info->queue_name = queue_name;
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+	else if (0 == strcmp(endpoint_address, AXIS2_WSA_ANONYMOUS_URL)) /* Required to work with Sandesha2  */
+	{
+		/* Server reply to dual-channel client */
+		axutil_property_t* property = NULL;
+		property = axis2_msg_ctx_get_property(msg_ctx, env, 
+				AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
+		
+		if (property)
+		{
+			axis2_char_t* queue_name = (axis2_char_t*)
+				axutil_property_get_value(property, env);
+
+			if (queue_name)
+			{
+				destination_info->queue_name = (axis2_char_t*)AXIS2_MALLOC(
+						env->allocator, (sizeof(axis2_char_t) * strlen(queue_name)) + 1);
+				strcpy(destination_info->queue_name, queue_name);
+			}
+		}
+	}
+	else if ((substr = strstr(endpoint_address, "jms:/")) &&
+			 (substr == endpoint_address))
+	{
+
+	}
+	else if ((substr = strstr(endpoint_address, "TempQueue")) &&
+			 (substr == endpoint_address))
+	{
+		axis2_char_t* queue_name = (axis2_char_t*)AXIS2_MALLOC(env->allocator, 
+				(sizeof(axis2_char_t) * strlen(endpoint_address)) + 1);
+		strcpy(queue_name, endpoint_address);
+		destination_info->queue_name = queue_name;
+	}
+	else
+	{
+		AXIS2_FREE(env->allocator, destination_info);
+		AXIS2_FREE(env->allocator, endpoint_address);
+		return NULL;
+	}
+
+	/* Get broker IP/Port from conf_ctx if they are not 
+	 * found in the destination URI */
+	if (!destination_info->broker_ip)
+	{
+		axis2_conf_ctx_t* conf_ctx = NULL;
+
+		conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+		if (conf_ctx)
+		{
+			axutil_property_t* property = NULL;
+			property = axis2_conf_ctx_get_property(conf_ctx, env, 
+					AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP);
+
+			if (property)
+			{
+				axis2_char_t* broker_ip = (axis2_char_t*)
+					axutil_property_get_value(property, env);
+
+				if (broker_ip)
+				{
+					destination_info->broker_ip = (axis2_char_t*)AXIS2_MALLOC(
+							env->allocator, (sizeof(axis2_char_t) * strlen(broker_ip)) + 1);
+					strcpy(destination_info->broker_ip, broker_ip);
+				}
+			}
+
+		}
+	}
+
+	if (AXIS2_QPID_NULL_BROKER_PORT == destination_info->broker_port)
+	{
+		axis2_conf_ctx_t* conf_ctx = NULL;
+
+		conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+		if (conf_ctx)
+		{
+			axutil_property_t* property = NULL;
+			property = axis2_conf_ctx_get_property(conf_ctx, env, 
+					AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT);
+
+			if (property)
+			{
+				void* value = axutil_property_get_value(property, env);
+
+				if (value)
+				{
+					destination_info->broker_port = *(int*)value;
+				}
+			}
+		}
+	}
 
-	if (!endpoint_url)
-		return -1;
+	AXIS2_FREE(env->allocator, endpoint_address);
 
-	return axutil_url_get_port (endpoint_url, env);;
+	return destination_info;
 }

Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_util.h Wed Nov 19 22:28:35 2008
@@ -22,32 +22,90 @@
 #include <axutil_param_container.h>
 #include <axiom_soap.h>
 #include <axis2_conf_ctx.h>
-#include <axutil_url.h>
+#include <axutil_stream.h>
+#include <axis2_amqp_defines.h>
 
-axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_conf_value_string (axis2_transport_in_desc_t* in_desc,
-						  	      	   const axutil_env_t* 	   	  env,
-						               const axis2_char_t*		  param_name);
-
-int AXIS2_CALL
-axis2_amqp_util_get_conf_value_int (axis2_transport_in_desc_t* in_desc,
-						       		const axutil_env_t* 	   env,
-						       		const axis2_char_t*		   param_name);
-
-axiom_soap_envelope_t* AXIS2_CALL
-axis2_amqp_util_get_soap_envelope (const axis2_char_t* content,
-								   const axutil_env_t* env);
-
-axutil_url_t* AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_url (axis2_msg_ctx_t*    msg_ctx,
-									 const axutil_env_t* env);
-
-axis2_char_t* AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_ip (axis2_msg_ctx_t*    msg_ctx,
-									const axutil_env_t* env);
-
-int AXIS2_CALL
-axis2_amqp_util_get_qpid_broker_port (axis2_msg_ctx_t*    msg_ctx,
-									  const axutil_env_t* env);
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+	typedef struct axis2_amqp_binary_data_buffer
+	{
+		void* data;
+		int length;
+		axis2_char_t* content_type;
+	} axis2_amqp_binary_data_buffer_t;
+
+	typedef struct axis2_amqp_destination_info
+	{
+		axis2_char_t* broker_ip;
+		int broker_port;
+		axis2_char_t* queue_name;
+	} axis2_amqp_destination_info_t;
+
+	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+	axis2_amqp_util_get_conf_value_string(
+		axis2_transport_in_desc_t* in_desc,
+		const axutil_env_t* env,
+		const axis2_char_t* param_name);
+
+	AXIS2_EXTERN int AXIS2_CALL
+	axis2_amqp_util_get_conf_value_int(
+		axis2_transport_in_desc_t* in_desc,
+		const axutil_env_t* env,
+		const axis2_char_t* param_name);
+
+	AXIS2_EXTERN axiom_soap_envelope_t* AXIS2_CALL
+	axis2_amqp_util_get_soap_envelope(
+		axis2_amqp_binary_data_buffer_t* binary_data_buffer,
+		const axutil_env_t* env,
+		axis2_msg_ctx_t* msg_ctx);
+
+	AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+	axis2_amqp_util_conf_ctx_get_server_side(
+		axis2_conf_ctx_t* conf_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+	axis2_amqp_util_get_value_from_content_type(
+		const axutil_env_t * env,
+		const axis2_char_t * content_type,
+		const axis2_char_t * key);
+
+	AXIS2_EXTERN int AXIS2_CALL 
+	axis2_amqp_util_on_data_request(
+		char *buffer,
+		int size,
+		void *ctx);
+
+	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+	axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(
+		axis2_conf_ctx_t* conf_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+	axis2_amqp_util_conf_ctx_get_qpid_broker_ip(
+		axis2_conf_ctx_t* conf_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN int AXIS2_CALL
+	axis2_amqp_util_conf_ctx_get_qpid_broker_port(
+		axis2_conf_ctx_t* conf_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+	axis2_amqp_util_msg_ctx_get_use_separate_listener(
+		axis2_msg_ctx_t* msg_ctx,
+		const axutil_env_t* env);
+
+	AXIS2_EXTERN axis2_amqp_destination_info_t* AXIS2_CALL 
+	axis2_amqp_util_msg_ctx_get_destination_info(
+		axis2_msg_ctx_t* msg_ctx,
+		const axutil_env_t* env);
+
+#ifdef __cplusplus
+}
+#endif
 
 #endif