You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by sh...@apache.org on 2008/11/20 07:28:38 UTC
svn commit: r719179 [2/3] - in /webservices/axis2/trunk/c: samples/
samples/client/amqp/ samples/client/amqp/echo/ samples/client/amqp/mtom/
samples/client/amqp/mtom/resources/ samples/client/amqp/notify/
src/core/transport/amqp/receiver/ src/core/tran...
Added: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp?rev=719179&view=auto
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp (added)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp Wed Nov 19 22:28:35 2008
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * tcp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <axis2_amqp_request_processor.h>
+#include <axis2_amqp_defines.h>
+#include <axis2_amqp_util.h>
+#include <axis2_qpid_receiver_listener.h>
+#include <axis2_qpid_receiver.h>
+#include <string>
+
+Axis2QpidReceiverListener::Axis2QpidReceiverListener(
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx)
+{
+ this->env = env;
+ this->conf_ctx = conf_ctx;
+}
+
+
+Axis2QpidReceiverListener::~Axis2QpidReceiverListener(void)
+{}
+
+
+void Axis2QpidReceiverListener::received(Message& message)
+{
+ AXIS2_ENV_CHECK(env, void);
+
+ axis2_amqp_request_processor_resource_pack_t* request_data = NULL;
+#ifdef AXIS2_SVR_MULTI_THREADED
+ axutil_thread_t* worker_thread = NULL;
+#endif
+
+ request_data = (axis2_amqp_request_processor_resource_pack_t*)
+ AXIS2_MALLOC(env->allocator,
+ sizeof(axis2_amqp_request_processor_resource_pack_t));
+
+ if (!request_data)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Memory Allocation Error");
+ return;
+ }
+
+ request_data->env = (axutil_env_t*)env;
+ request_data->conf_ctx = conf_ctx;
+
+ /* Create a Local Copy of Request Content */
+ std::string message_data = message.getData();
+ axis2_char_t* request_content = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ message_data.size());
+ memcpy(request_content, message_data.c_str(), message_data.size());
+
+ request_data->request_content = request_content;
+ request_data->content_length = message_data.size();
+
+ /* Set ReplyTo */
+ request_data->reply_to = NULL;
+ if (message.getMessageProperties().hasReplyTo())
+ {
+ /* Create a Local Copy of ReplyTo */
+ std::string reply_to_tmp = message.getMessageProperties().getReplyTo().getRoutingKey();
+ axis2_char_t* reply_to = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ reply_to_tmp.size() + 1);
+ strcpy(reply_to, reply_to_tmp.c_str());
+
+ request_data->reply_to = reply_to;
+ }
+
+ /* Copy AMQP headers */
+ /* Content-Type */
+ request_data->content_type = NULL;
+ std::string content_type_tmp = message.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
+ if (!content_type_tmp.empty())
+ {
+ axis2_char_t* content_type = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ content_type_tmp.size() + 1);
+ strcpy(content_type, content_type_tmp.c_str());
+
+ request_data->content_type = content_type;
+ }
+
+ /* SOAPAction */
+ request_data->soap_action = NULL;
+ std::string soap_action_tmp = message.getHeaders().getString(AXIS2_AMQP_HEADER_SOAP_ACTION);
+ if (!soap_action_tmp.empty())
+ {
+ axis2_char_t* soap_action = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ soap_action_tmp.size() + 1);
+ strcpy(soap_action, soap_action_tmp.c_str());
+
+ request_data->soap_action = soap_action;
+ }
+
+#ifdef AXIS2_SVR_MULTI_THREADED
+ worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
+ axis2_amqp_request_processor_thread_function,
+ (void*)request_data);
+
+ if (!worker_thread)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create Thread");
+ return;
+ }
+
+ axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+#else
+ axis2_amqp_request_processor_thread_function(NULL, (void*)request_data);
+#endif
+}
Added: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h?rev=719179&view=auto
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h (added)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h Wed Nov 19 22:28:35 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * tcp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef AXIS2_QPID_RECEIVER_LISTENER_H
+#define AXIS2_QPID_RECEIVER_LISTENER_H
+
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/Message.h>
+#include <axutil_env.h>
+#include <axis2_conf_init.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+class Axis2QpidReceiverListener : public MessageListener
+{
+ public:
+ Axis2QpidReceiverListener(const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx);
+ ~Axis2QpidReceiverListener(void);
+
+ private:
+ virtual void received(Message& message);
+
+ const axutil_env_t* env;
+ axis2_conf_ctx_t* conf_ctx;
+};
+
+#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/Makefile.am?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/Makefile.am (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/Makefile.am Wed Nov 19 22:28:35 2008
@@ -2,7 +2,8 @@
libaxis2_amqp_request_processor_la_SOURCES = axis2_amqp_request_processor.c
-libaxis2_amqp_request_processor_la_LIBADD = $(top_builddir)/util/src/libaxutil.la
+libaxis2_amqp_request_processor_la_LIBADD = $(top_builddir)/util/src/libaxutil.la \
+ $(top_builddir)/src/core/transport/amqp/util/libaxis2_amqp_util.la
libaxis2_amqp_request_processor_la_LDFLAGS = -version-info $(VERSION_NO)
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.c Wed Nov 19 22:28:35 2008
@@ -19,12 +19,16 @@
#include <axiom.h>
#include <axiom_soap.h>
#include <axis2_engine.h>
+#include <axiom_mime_parser.h>
+#include <axutil_http_chunked_stream.h>
#include <axis2_amqp_defines.h>
+#include <axis2_amqp_util.h>
#include <axis2_amqp_request_processor.h>
void* AXIS2_THREAD_FUNC
-axis2_amqp_request_processor_thread_function (axutil_thread_t* thread,
- void* request_data)
+axis2_amqp_request_processor_thread_function(
+ axutil_thread_t* thread,
+ void* request_data)
{
axis2_status_t status = AXIS2_FAILURE;
axutil_env_t* env = NULL;
@@ -33,30 +37,33 @@
#ifndef WIN32
#ifdef AXIS2_SVR_MULTI_THREADED
- signal (SIGPIPE, SIG_IGN);
+ signal(SIGPIPE, SIG_IGN);
#endif
#endif
request_resource_pack = (axis2_amqp_request_processor_resource_pack_t*)request_data;
env = request_resource_pack->env;
- thread_env = axutil_init_thread_env (env);
+ thread_env = axutil_init_thread_env(env);
/* Process Request */
- status = axis2_amqp_process_request (thread_env, request_resource_pack);
+ status = axis2_amqp_process_request(thread_env, request_resource_pack);
if (status == AXIS2_SUCCESS)
{
- AXIS2_LOG_INFO (thread_env->log, "Request Processed Successfully");
+ AXIS2_LOG_INFO(thread_env->log, "Request Processed Successfully");
}
else
{
- AXIS2_LOG_WARNING (thread_env->log, AXIS2_LOG_SI, "Error while Processing Request");
+ AXIS2_LOG_WARNING(thread_env->log, AXIS2_LOG_SI, "Error while Processing Request");
}
- AXIS2_FREE (thread_env->allocator, request_resource_pack->request_content);
- AXIS2_FREE (thread_env->allocator, request_resource_pack->reply_to);
- AXIS2_FREE (thread_env->allocator, request_resource_pack);
+ AXIS2_FREE(thread_env->allocator, request_resource_pack->request_content);
+ AXIS2_FREE(thread_env->allocator, request_resource_pack->reply_to);
+ AXIS2_FREE(thread_env->allocator, request_resource_pack->content_type);
+ AXIS2_FREE(thread_env->allocator, request_resource_pack->soap_action);
+
+ AXIS2_FREE(thread_env->allocator, request_resource_pack);
if (thread_env)
{
@@ -64,7 +71,7 @@
}
#ifdef AXIS2_SVR_MULTI_THREADED
- axutil_thread_pool_exit_thread (env->thread_pool, thread);
+ axutil_thread_pool_exit_thread(env->thread_pool, thread);
#endif
return NULL;
@@ -72,8 +79,9 @@
axis2_status_t
-axis2_amqp_process_request (const axutil_env_t* env,
- axis2_amqp_request_processor_resource_pack_t* request_resource_pack)
+axis2_amqp_process_request(
+ const axutil_env_t* env,
+ axis2_amqp_request_processor_resource_pack_t* request_resource_pack)
{
axiom_xml_reader_t* xml_reader = NULL;
axiom_stax_builder_t* stax_builder = NULL;
@@ -82,90 +90,254 @@
axis2_transport_in_desc_t* in_desc = NULL;
axis2_msg_ctx_t* msg_ctx = NULL;
axiom_soap_envelope_t* soap_envelope = NULL;
- axutil_property_t* property = NULL;
axis2_engine_t* engine = NULL;
+ const axis2_char_t* soap_ns_uri = NULL;
+ axis2_bool_t is_soap_11 = AXIS2_FALSE;
+ axis2_char_t *soap_body_str = NULL;
+ int soap_body_len = 0;
+ axis2_bool_t is_mtom = AXIS2_FALSE;
+ axis2_status_t status = AXIS2_FAILURE;
+ axutil_hash_t *binary_data_map = NULL;
+ axiom_soap_body_t *soap_body = NULL;
+ axis2_endpoint_ref_t* reply_to = NULL;
+ axutil_property_t* reply_to_property = NULL;
- xml_reader = axiom_xml_reader_create_for_memory (env,
- request_resource_pack->request_content,
- axutil_strlen (request_resource_pack->request_content),
- NULL,
- AXIS2_XML_PARSER_TYPE_BUFFER);
- if (!xml_reader)
+ /* Create msg_ctx */
+ if (!request_resource_pack->conf_ctx)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Conf Context not Available");
+ return AXIS2_FAILURE;
+ }
+
+ out_desc = axis2_conf_get_transport_out(
+ axis2_conf_ctx_get_conf(request_resource_pack->conf_ctx, env),
+ env, AXIS2_TRANSPORT_ENUM_AMQP);
+ if (!out_desc)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Transport Out Descriptor not Found");
+ return AXIS2_FAILURE;
+ }
+
+ in_desc = axis2_conf_get_transport_in(
+ axis2_conf_ctx_get_conf(request_resource_pack->conf_ctx, env),
+ env, AXIS2_TRANSPORT_ENUM_AMQP);
+ if (!in_desc)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create XML Reader");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Transport In Descriptor not Found");
return AXIS2_FAILURE;
}
+
+ /* Create msg_ctx */
+ msg_ctx = axis2_msg_ctx_create(env, request_resource_pack->conf_ctx, in_desc, out_desc);
- stax_builder = axiom_stax_builder_create (env, xml_reader);
- if (!stax_builder)
+ axis2_msg_ctx_set_server_side(msg_ctx, env, AXIS2_TRUE);
+
+ /* Handle MTOM */
+ if (strstr(request_resource_pack->content_type, AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED))
+ {
+ axis2_char_t* mime_boundary =
+ axis2_amqp_util_get_value_from_content_type(env,
+ request_resource_pack->content_type,
+ AXIS2_AMQP_HEADER_CONTENT_TYPE_MIME_BOUNDARY);
+
+ if (mime_boundary)
+ {
+ axiom_mime_parser_t *mime_parser = NULL;
+ int soap_body_len = 0;
+ axutil_param_t *buffer_size_param = NULL;
+ axutil_param_t *max_buffers_param = NULL;
+ axutil_param_t *attachment_dir_param = NULL;
+ axis2_char_t *value_size = NULL;
+ axis2_char_t *value_num = NULL;
+ axis2_char_t *value_dir = NULL;
+ int size = 0;
+ int num = 0;
+
+ mime_parser = axiom_mime_parser_create(env);
+
+ buffer_size_param = axis2_msg_ctx_get_parameter(msg_ctx, env, AXIS2_MTOM_BUFFER_SIZE);
+ if (buffer_size_param)
+ {
+ value_size = (axis2_char_t*)axutil_param_get_value(buffer_size_param, env);
+ if (value_size)
+ {
+ size = atoi(value_size);
+ axiom_mime_parser_set_buffer_size(mime_parser, env, size);
+ }
+ }
+
+ max_buffers_param = axis2_msg_ctx_get_parameter(msg_ctx, env, AXIS2_MTOM_MAX_BUFFERS);
+ if (max_buffers_param)
+ {
+ value_num = (axis2_char_t*)axutil_param_get_value(max_buffers_param, env);
+ if (value_num)
+ {
+ num = atoi(value_num);
+ axiom_mime_parser_set_max_buffers(mime_parser, env, num);
+ }
+ }
+
+ /* If this paramter is there mime_parser will cached the attachment
+ * using to the directory for large attachments. */
+ attachment_dir_param = axis2_msg_ctx_get_parameter(msg_ctx, env, AXIS2_ATTACHMENT_DIR);
+ if (attachment_dir_param)
+ {
+ value_dir = (axis2_char_t*)axutil_param_get_value(attachment_dir_param, env);
+ if (value_dir)
+ {
+ axiom_mime_parser_set_attachment_dir(mime_parser, env, value_dir);
+ }
+ }
+
+ if (mime_parser)
+ {
+ axis2_callback_info_t *callback_ctx = NULL;
+ axutil_stream_t *stream = NULL;
+
+ callback_ctx = AXIS2_MALLOC(env->allocator, sizeof(axis2_callback_info_t));
+
+ stream = axutil_stream_create_basic(env);
+ if (stream)
+ {
+ axutil_stream_write(stream, env, request_resource_pack->request_content,
+ request_resource_pack->content_length);
+ callback_ctx->env = env;
+ callback_ctx->in_stream = stream;
+ callback_ctx->content_length = request_resource_pack->content_length;
+ callback_ctx->unread_len = request_resource_pack->content_length;
+ callback_ctx->chunked_stream = NULL;
+ }
+
+ binary_data_map =
+ axiom_mime_parser_parse(mime_parser, env,
+ axis2_amqp_util_on_data_request,
+ (void*)callback_ctx,
+ mime_boundary);
+ if (!binary_data_map)
+ {
+ return AXIS2_FAILURE;
+ }
+
+ soap_body_str = axiom_mime_parser_get_soap_body_str(mime_parser, env);
+ soap_body_len = axiom_mime_parser_get_soap_body_len(mime_parser, env);
+
+ axutil_stream_free(stream, env);
+ AXIS2_FREE(env->allocator, callback_ctx);
+ axiom_mime_parser_free(mime_parser, env);
+ }
+
+ AXIS2_FREE(env->allocator, mime_boundary);
+ }
+
+ is_mtom = AXIS2_TRUE;
+ }
+ else
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create StAX Builder");
+ soap_body_str = request_resource_pack->request_content;
+ soap_body_len = request_resource_pack->content_length;
+ }
+
+ soap_body_len = axutil_strlen(soap_body_str);
+
+ xml_reader = axiom_xml_reader_create_for_memory(env, soap_body_str, soap_body_len,
+ NULL, AXIS2_XML_PARSER_TYPE_BUFFER);
+ if (!xml_reader)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create XML Reader");
return AXIS2_FAILURE;
}
- soap_builder = axiom_soap_builder_create (env,
- stax_builder,
- AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI);
- if (!soap_builder)
+ stax_builder = axiom_stax_builder_create(env, xml_reader);
+ if (!stax_builder)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create SOAP Builder");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create StAX Builder");
return AXIS2_FAILURE;
}
- if (!request_resource_pack->conf_ctx)
+ soap_ns_uri = AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
+
+ if (request_resource_pack->content_type)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Conf Context not Available");
- return AXIS2_FAILURE;
+ if (strstr(request_resource_pack->content_type, AXIS2_AMQP_HEADER_ACCEPT_TEXT_XML))
+ {
+ is_soap_11 = AXIS2_TRUE;
+ soap_ns_uri = AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI;
+ }
+ /*if (strstr(request_resource_pack->content_type, AXIS2_AMQP_HEADER_ACCEPT_APPL_SOAP))
+ {
+ is_soap_11 = AXIS2_FALSE;
+ soap_ns_uri = AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
+ }
+ else if (strstr(request_resource_pack->content_type, AXIS2_AMQP_HEADER_ACCEPT_TEXT_XML))
+ {
+ is_soap_11 = AXIS2_TRUE;
+ soap_ns_uri = AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI;
+ }*/
}
- out_desc = axis2_conf_get_transport_out (axis2_conf_ctx_get_conf (request_resource_pack->conf_ctx,
- env),
- env,
- AXIS2_TRANSPORT_ENUM_AMQP);
- if (!out_desc)
+ soap_builder = axiom_soap_builder_create(env, stax_builder, soap_ns_uri);
+ if (!soap_builder)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Transport Out Descriptor not Found");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create SOAP Builder");
return AXIS2_FAILURE;
}
+
+ if (binary_data_map)
+ {
+ axiom_soap_builder_set_mime_body_parts(soap_builder, env, binary_data_map);
+ }
- in_desc = axis2_conf_get_transport_in (axis2_conf_ctx_get_conf (request_resource_pack->conf_ctx,
- env),
- env,
- AXIS2_TRANSPORT_ENUM_AMQP);
- if (!in_desc)
+ soap_envelope = axiom_soap_builder_get_soap_envelope(soap_builder, env);
+ axis2_msg_ctx_set_soap_envelope(msg_ctx, env, soap_envelope);
+
+ soap_body = axiom_soap_envelope_get_body(soap_envelope, env);
+
+ if (!soap_body)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Transport In Descriptor not Found");
return AXIS2_FAILURE;
}
- msg_ctx = axis2_msg_ctx_create (env,
- request_resource_pack->conf_ctx,
- in_desc,
- out_desc);
- axis2_msg_ctx_set_server_side (msg_ctx, env, AXIS2_TRUE);
+ /* SOAPAction */
+ if (request_resource_pack->soap_action)
+ {
+ axis2_msg_ctx_set_soap_action(msg_ctx, env,
+ axutil_string_create(env, request_resource_pack->soap_action));
+ }
- soap_envelope = axiom_soap_builder_get_soap_envelope (soap_builder, env);
- axis2_msg_ctx_set_soap_envelope (msg_ctx, env, soap_envelope);
+ /* SOAP version */
+ axis2_msg_ctx_set_is_soap_11(msg_ctx, env, is_soap_11);
/* Set ReplyTo in the msg_ctx */
- property = axutil_property_create (env);
- axutil_property_set_scope (property, env, AXIS2_SCOPE_REQUEST);
- axutil_property_set_value (property, env, request_resource_pack->reply_to);
- axis2_msg_ctx_set_property (msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO, property);
-
- /* Set Qpid Broker IP in the msg_ctx */
- property = axutil_property_create (env);
- axutil_property_set_scope (property, env, AXIS2_SCOPE_REQUEST);
- axutil_property_set_value (property, env, request_resource_pack->qpid_broker_ip);
- axis2_msg_ctx_set_property (msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_IP, property);
-
- /* Set Qpid Broker Port in the msg_ctx */
- property = axutil_property_create (env);
- axutil_property_set_scope (property, env, AXIS2_SCOPE_REQUEST);
- axutil_property_set_value (property, env, &request_resource_pack->qpid_broker_port);
- axis2_msg_ctx_set_property (msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_PORT, property);
+ reply_to = axis2_endpoint_ref_create(env, request_resource_pack->reply_to);
+ axis2_msg_ctx_set_reply_to(msg_ctx, env, reply_to);
- engine = axis2_engine_create (env, request_resource_pack->conf_ctx);
+ /* Set ReplyTo in the msg_ctx as a property. This is useful when
+ * server replies in the dual-channel case */
+ reply_to_property = axutil_property_create_with_args(env, AXIS2_SCOPE_REQUEST, 0, 0,
+ (void*)request_resource_pack->reply_to);
+ axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO,
+ reply_to_property);
+
+ engine = axis2_engine_create(env, request_resource_pack->conf_ctx);
+
+ if (AXIS2_TRUE == axiom_soap_body_has_fault(soap_body, env))
+ {
+ status = axis2_engine_receive_fault(engine, env, msg_ctx);
+ }
+ else
+ {
+ status = axis2_engine_receive(engine, env, msg_ctx);
+ }
+
+ if (engine)
+ {
+ axis2_engine_free(engine, env);
+ }
+
+ if (soap_body_str && is_mtom)
+ {
+ AXIS2_FREE(env->allocator, soap_body_str);
+ }
- return axis2_engine_receive (engine, env, msg_ctx);
+ return status;
}
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/receiver/qpid_receiver/request_processor/axis2_amqp_request_processor.h Wed Nov 19 22:28:35 2008
@@ -26,25 +26,27 @@
{
#endif
-typedef struct axis2_amqp_request_processor_resource_pack
-{
- axutil_env_t* env;
- axis2_conf_ctx_t* conf_ctx;
- axis2_char_t* request_content;
- axis2_char_t* reply_to;
- axis2_char_t* qpid_broker_ip;
- int qpid_broker_port;
-}
-axis2_amqp_request_processor_resource_pack_t;
+ typedef struct axis2_amqp_request_processor_resource_pack
+ {
+ axutil_env_t* env;
+ axis2_conf_ctx_t* conf_ctx;
+ axis2_char_t* request_content;
+ int content_length;
+ axis2_char_t* reply_to;
+ axis2_char_t* content_type;
+ axis2_char_t* soap_action;
+ } axis2_amqp_request_processor_resource_pack_t;
-/* The worker thread function */
-void* AXIS2_THREAD_FUNC
-axis2_amqp_request_processor_thread_function (axutil_thread_t* thread,
- void* request_data);
+ /* The worker thread function */
+ void* AXIS2_THREAD_FUNC
+ axis2_amqp_request_processor_thread_function(
+ axutil_thread_t* thread,
+ void* request_data);
-axis2_status_t
-axis2_amqp_process_request (const axutil_env_t* env,
- axis2_amqp_request_processor_resource_pack_t* request_resource_pack);
+ axis2_status_t
+ axis2_amqp_process_request(
+ const axutil_env_t* env,
+ axis2_amqp_request_processor_resource_pack_t* request_resource_pack);
#ifdef __cplusplus
}
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/Makefile.am?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/Makefile.am (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/Makefile.am Wed Nov 19 22:28:35 2008
@@ -12,6 +12,8 @@
INCLUDES = -I$(top_builddir)/include \
-I$(top_builddir)/src/core/transport/amqp/util \
+ -I$(top_builddir)/src/core/transport/amqp/receiver \
+ -I$(top_builddir)/src/core/transport/amqp/receiver/qpid_receiver \
-I$(top_builddir)/src/core/transport/amqp/sender \
-I$(top_builddir)/src/core/transport/amqp/sender/qpid_sender \
-I$(top_builddir)/src/core/description \
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.c Wed Nov 19 22:28:35 2008
@@ -16,8 +16,10 @@
*/
#include <axiom_soap.h>
+#include <axis2_transport_in_desc.h>
#include <axis2_amqp_defines.h>
#include <axis2_amqp_util.h>
+#include <axis2_amqp_receiver.h>
#include <axis2_amqp_sender.h>
static const axis2_transport_sender_ops_t amqp_sender_ops = {
@@ -26,187 +28,260 @@
axis2_amqp_sender_clean_up,
axis2_amqp_sender_free};
-axis2_transport_sender_t* AXIS2_CALL
-axis2_amqp_sender_create (const axutil_env_t* env)
+AXIS2_EXTERN axis2_transport_sender_t* AXIS2_CALL
+axis2_amqp_sender_create(const axutil_env_t* env)
{
- AXIS2_ENV_CHECK (env, NULL);
+ AXIS2_ENV_CHECK(env, NULL);
axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
sender_resource_pack = (axis2_amqp_sender_resource_pack_t*)
- AXIS2_MALLOC (env->allocator,
- sizeof (axis2_amqp_sender_resource_pack_t));
+ AXIS2_MALLOC(env->allocator,
+ sizeof(axis2_amqp_sender_resource_pack_t));
if (!sender_resource_pack)
{
- AXIS2_ERROR_SET (env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
+ AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
sender_resource_pack->sender.ops = &amqp_sender_ops;
+ sender_resource_pack->conf_ctx = NULL;
return &(sender_resource_pack->sender);
}
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_init (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx,
- axis2_transport_out_desc_t* out_desc)
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_init(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx,
+ axis2_transport_out_desc_t* out_desc)
{
+ axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
+ sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+
+ sender_resource_pack->conf_ctx = conf_ctx;
+
return AXIS2_SUCCESS;
}
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_invoke (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_msg_ctx_t* msg_ctx)
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_invoke(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_msg_ctx_t* msg_ctx)
{
- AXIS2_ENV_CHECK (env, AXIS2_FAILURE);
- AXIS2_PARAM_CHECK (env->error, msg_ctx, AXIS2_FAILURE);
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+ AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
axiom_soap_envelope_t* request_soap_envelope = NULL;
axiom_xml_writer_t* xml_writer = NULL;
axiom_output_t* request_om_output = NULL;
axis2_char_t* request_content = NULL;
- axis2_char_t* response_content = NULL;
axiom_soap_envelope_t* response_soap_envelope = NULL;
- axis2_op_t* op = NULL;
- const axis2_char_t* mep = NULL;
axis2_bool_t is_server = AXIS2_TRUE;
- axis2_char_t* qpid_broker_ip = NULL;
- int qpid_broker_port = 0;
- axis2_char_t* send_to = NULL;
+ axis2_bool_t is_soap_11 = AXIS2_FALSE;
+ axutil_string_t* content_type = NULL;
+ const axis2_char_t* soap_action = NULL;
+ axis2_bool_t do_mtom = AXIS2_FALSE;
+ axis2_bool_t status = AXIS2_FAILURE;
- request_soap_envelope = axis2_msg_ctx_get_soap_envelope (msg_ctx, env);
+ request_soap_envelope = axis2_msg_ctx_get_soap_envelope(msg_ctx, env);
- xml_writer = axiom_xml_writer_create_for_memory (env,
- NULL,
- AXIS2_TRUE,
- 0,
- AXIS2_XML_PARSER_TYPE_BUFFER);
+ xml_writer = axiom_xml_writer_create_for_memory(env, NULL, AXIS2_TRUE, 0,
+ AXIS2_XML_PARSER_TYPE_BUFFER);
if (!xml_writer)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create XML Writer");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create XML Writer");
return AXIS2_FAILURE;
}
- request_om_output = axiom_output_create (env, xml_writer);
+ request_om_output = axiom_output_create(env, xml_writer);
if (!request_om_output)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Create OM Output");
- axiom_xml_writer_free (xml_writer, env);
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create OM Output");
+ axiom_xml_writer_free(xml_writer, env);
xml_writer = NULL;
return AXIS2_FAILURE;
}
- axiom_soap_envelope_serialize (request_soap_envelope, env, request_om_output, AXIS2_FALSE);
+ is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+
+ /* Set SOAP version */
+ axiom_output_set_soap11(request_om_output, env, is_soap_11);
+
+ /* Content-Type */
+ if (AXIS2_TRUE == is_soap_11)
+ {
+ /* SOAP1.1 */
+ content_type = axutil_string_create(env, AXIS2_AMQP_HEADER_ACCEPT_TEXT_XML);
+ }
+ else
+ {
+ /* SOAP1.2 */
+ content_type = axutil_string_create(env, AXIS2_AMQP_HEADER_ACCEPT_APPL_SOAP);
+ }
- request_content = (axis2_char_t*)axiom_xml_writer_get_xml (xml_writer, env);
+ /* SOAP action */
+ soap_action = axutil_string_get_buffer(axis2_msg_ctx_get_soap_action(msg_ctx, env), env);
+
+ if (!soap_action)
+ soap_action = "";
+
+ /* Handle MTOM */
+ do_mtom = axis2_msg_ctx_get_doing_mtom(msg_ctx, env);
+
+ axiom_output_set_do_optimize(request_om_output, env, do_mtom);
+ axiom_soap_envelope_serialize(request_soap_envelope, env, request_om_output, AXIS2_FALSE);
+
+ if (do_mtom)
+ {
+ axis2_status_t mtom_status = AXIS2_FAILURE;
+ axutil_array_list_t* mime_parts = NULL;
+
+ mtom_status = axiom_output_flush(request_om_output, env);
+
+ if (mtom_status == AXIS2_SUCCESS)
+ {
+ mime_parts = axiom_output_get_mime_parts(request_om_output, env);
+ if (!mime_parts)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "Unable to create the mime part list from request_om_output");
+
+ return AXIS2_FAILURE;
+ }
+ else
+ {
+ axis2_msg_ctx_set_mime_parts(msg_ctx, env, mime_parts);
+ }
+ }
+
+ content_type = axutil_string_create(env,
+ axiom_output_get_content_type(request_om_output, env));
+ }
+
+ request_content = (axis2_char_t*)axiom_xml_writer_get_xml(xml_writer, env);
if (!request_content)
{
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, "Failed to Serialize the SOAP Envelope");
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Serialize the SOAP Envelope");
return AXIS2_FAILURE;
}
- op = axis2_msg_ctx_get_op (msg_ctx, env);
- mep = axis2_op_get_msg_exchange_pattern (op, env);
-
- is_server = axis2_msg_ctx_get_server_side (msg_ctx, env);
+ is_server = axis2_msg_ctx_get_server_side(msg_ctx, env);
if (is_server)
{
- /* Get Qpid Broker IP/Port/ReplyTo from the msg_ctx */
- qpid_broker_ip = (axis2_char_t*)axis2_msg_ctx_get_property_value (msg_ctx,
- env,
- AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_IP);
-
- qpid_broker_port = *(int*)axis2_msg_ctx_get_property_value (msg_ctx,
- env,
- AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_PORT);
-
- send_to = (axis2_char_t*)axis2_msg_ctx_get_property_value (msg_ctx,
- env,
- AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO);
-
- axis2_qpid_server_send (request_content,
- qpid_broker_ip,
- qpid_broker_port,
- send_to);
+ status = axis2_qpid_server_send(request_content, env,
+ axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
}
else
{
- /* Get Qpid Broker IP/Port from the Request URI */
- qpid_broker_ip = axis2_amqp_util_get_qpid_broker_ip (msg_ctx, env);
- qpid_broker_port = axis2_amqp_util_get_qpid_broker_port (msg_ctx, env);
+ axis2_op_t* op = NULL;
+ const axis2_char_t* mep = NULL;
+
+ op = axis2_msg_ctx_get_op(msg_ctx, env);
+ mep = axis2_op_get_msg_exchange_pattern(op, env);
- if (axutil_strcmp (mep, AXIS2_MEP_URI_OUT_ONLY) == 0 ||
- axutil_strcmp (mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0)
+ if (axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) == 0 ||
+ axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY) == 0) /* One-way */
{
- axis2_qpid_client_send (request_content,
- qpid_broker_ip,
- qpid_broker_port);
+ status = axis2_qpid_client_send_robust(request_content, env,
+ axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+
+ /* Set Status Code in msg_ctx */
+ axis2_msg_ctx_set_status_code(msg_ctx, env, status);
}
else
{
- response_content = axis2_qpid_client_request (request_content,
- qpid_broker_ip,
- qpid_broker_port,
- env);
+ if (AXIS2_TRUE == axis2_amqp_util_msg_ctx_get_use_separate_listener(msg_ctx, env)) /* Dual Channel */
+ {
+ axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
+ sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
+
+ if (!sender_resource_pack->conf_ctx)
+ return AXIS2_FAILURE;
+
+ axis2_char_t* reply_to_queue_name = NULL;
+ reply_to_queue_name = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(
+ sender_resource_pack->conf_ctx, env);
- if (response_content)
+ if (!reply_to_queue_name)
+ return AXIS2_FAILURE;
+
+ status = axis2_qpid_client_send_dual(request_content, env, reply_to_queue_name,
+ axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+ }
+ else /* Single Channel */
{
- response_soap_envelope = axis2_amqp_util_get_soap_envelope (response_content, env);
+ axis2_amqp_binary_data_buffer_t* binary_data_buffer = NULL;
+ binary_data_buffer = axis2_qpid_client_send_receive(request_content, env,
+ axutil_string_get_buffer(content_type, env), soap_action, msg_ctx);
+
+ if (binary_data_buffer)
+ {
+ response_soap_envelope = axis2_amqp_util_get_soap_envelope(binary_data_buffer, env, msg_ctx);
- if (response_soap_envelope)
- axis2_msg_ctx_set_response_soap_envelope (msg_ctx, env, response_soap_envelope);
+ if (response_soap_envelope)
+ axis2_msg_ctx_set_response_soap_envelope(msg_ctx, env, response_soap_envelope);
+
+ AXIS2_FREE(env->allocator, binary_data_buffer->data);
+ AXIS2_FREE(env->allocator, binary_data_buffer->content_type);
+ AXIS2_FREE(env->allocator, binary_data_buffer);
- /*AXIS2_FREE (env->allocator, response_content);*/
+ status = AXIS2_SUCCESS;
+ }
}
}
}
+
+ if (content_type)
+ axutil_string_free(content_type, env);
- return AXIS2_SUCCESS;
+ return status;
}
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_clean_up (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_msg_ctx_t* msg_ctx)
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_clean_up(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_msg_ctx_t* msg_ctx)
{
return AXIS2_SUCCESS;
}
-void AXIS2_CALL
-axis2_amqp_sender_free (axis2_transport_sender_t* sender,
- const axutil_env_t* env)
+AXIS2_EXTERN void AXIS2_CALL
+axis2_amqp_sender_free(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env)
{
- AXIS2_ENV_CHECK (env, void);
+ AXIS2_ENV_CHECK(env, void);
axis2_amqp_sender_resource_pack_t* sender_resource_pack = NULL;
- sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK (sender);
+ sender_resource_pack = AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(sender);
- AXIS2_FREE (env->allocator, sender_resource_pack);
+ AXIS2_FREE(env->allocator, sender_resource_pack);
}
/* Library Exports */
AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
-axis2_get_instance (
+axis2_get_instance(
#else
-axis2_amqp_sender_get_instance (
+axis2_amqp_sender_get_instance(
#endif
struct axis2_transport_sender** inst,
- const axutil_env_t* env)
+ const axutil_env_t* env)
{
int status = AXIS2_SUCCESS;
- *inst = axis2_amqp_sender_create (env);
+ *inst = axis2_amqp_sender_create(env);
if (!(*inst))
{
status = AXIS2_FAILURE;
@@ -218,16 +293,16 @@
AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
-axis2_remove_instance (
+axis2_remove_instance(
#else
-axis2_amqp_sender_remove_instance (
+axis2_amqp_sender_remove_instance(
#endif
axis2_transport_sender_t* inst,
- const axutil_env_t* env)
+ const axutil_env_t* env)
{
if (inst)
{
- axis2_transport_sender_free (inst, env);
+ axis2_transport_sender_free(inst, env);
}
return AXIS2_SUCCESS;
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/axis2_amqp_sender.h Wed Nov 19 22:28:35 2008
@@ -26,33 +26,38 @@
typedef struct axis2_amqp_sender_resource_pack
{
axis2_transport_sender_t sender;
+ axis2_conf_ctx_t* conf_ctx;
}
axis2_amqp_sender_resource_pack_t;
#define AXIS2_AMQP_SENDER_TO_RESOURCE_PACK(amqp_sender) \
((axis2_amqp_sender_resource_pack_t*)(amqp_sender))
-axis2_transport_sender_t* AXIS2_CALL
-axis2_amqp_sender_create (const axutil_env_t* env);
+AXIS2_EXTERN axis2_transport_sender_t* AXIS2_CALL
+axis2_amqp_sender_create(const axutil_env_t* env);
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_init (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx,
- axis2_transport_out_desc_t* out_desc);
-
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_invoke (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_msg_ctx_t* msg_ctx);
-
-axis2_status_t AXIS2_CALL
-axis2_amqp_sender_clean_up (axis2_transport_sender_t* sender,
- const axutil_env_t* env,
- axis2_msg_ctx_t* msg_ctx);
-
-void AXIS2_CALL
-axis2_amqp_sender_free (axis2_transport_sender_t* sender,
- const axutil_env_t* env);
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_init(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx,
+ axis2_transport_out_desc_t* out_desc);
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_invoke(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_msg_ctx_t* msg_ctx);
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_amqp_sender_clean_up(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env,
+ axis2_msg_ctx_t* msg_ctx);
+
+AXIS2_EXTERN void AXIS2_CALL
+axis2_amqp_sender_free(
+ axis2_transport_sender_t* sender,
+ const axutil_env_t* env);
#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/Makefile.am?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/Makefile.am (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/Makefile.am Wed Nov 19 22:28:35 2008
@@ -4,6 +4,7 @@
axis2_qpid_sender_interface.cpp
libaxis2_qpid_sender_la_LIBADD = $(top_builddir)/util/src/libaxutil.la \
+ $(top_builddir)/src/core/transport/amqp/util/libaxis2_amqp_util.la \
$(QPID_HOME)/lib/libqpidclient.la
libaxis2_qpid_sender_la_LDFLAGS = g++ -version-info $(VERSION_NO)
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.cpp Wed Nov 19 22:28:35 2008
@@ -16,143 +16,306 @@
*/
#include <axis2_amqp_defines.h>
+#include <axiom_mime_part.h>
+#include <fstream>
#include <axis2_qpid_sender.h>
-Axis2QpidSender::Axis2QpidSender (string qpidBrokerIP,
- int qpidBrokerPort)
+using namespace std;
+
+Axis2QpidSender::Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env)
{
this->qpidBrokerIP = qpidBrokerIP;
this->qpidBrokerPort = qpidBrokerPort;
- this->dispatcher = NULL;
+ this->env = env;
this->responseContent = "";
- this->responseQueue.clear ();
+ this->responseContentType = "";
+ this->subscriptionManager = NULL;
}
-Axis2QpidSender::~Axis2QpidSender (void)
+Axis2QpidSender::~Axis2QpidSender(void)
{
- if (dispatcher)
- delete dispatcher;
+ if (subscriptionManager)
+ delete subscriptionManager;
}
-bool Axis2QpidSender::CreateSession (void)
+bool Axis2QpidSender::ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
bool status = false;
-
+ this->responseContent = "";
+ this->responseContentType = "";
+
try
{
- /* Create Connection to Qpid Broker */
- connection.open (qpidBrokerIP, qpidBrokerPort);
+ Connection connection;
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ Session session = connection.newSession();
+
+ /* Declare Private Queue */
+ string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX;
+ replyToQueueName.append(axutil_uuid_gen(env));
+
+ session.queueDeclare(arg::queue = replyToQueueName, arg::autoDelete = true);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
+ arg::queue = replyToQueueName,
+ arg::bindingKey = replyToQueueName);
+
+ /* Create Message */
+ Message message;
+
+ message.getDeliveryProperties().setRoutingKey(toQueueName);
+ message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
- session = connection.newSession ();
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+ if (mime_parts)
+ {
+ string mimeBody;
+ GetMimeBody(mime_parts, mimeBody);
+
+ messageContent.clear();/* MIME parts include SOAP envelop */
+
+ messageContent.append(mimeBody);
+ }
+
+ message.setData(messageContent);
+
+ async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
+
+ /* Create Subscription manager */
+ subscriptionManager = new SubscriptionManager(session);
+
+ subscriptionManager->subscribe(*this, replyToQueueName);
+ subscriptionManager->run();
+
+ /* Current thread gets bloked here until the response hits the message listener */
+
+ connection.close();
+
status = true;
}
catch (const std::exception& e)
{
- connection.close ();
}
return status;
}
-string Axis2QpidSender::Request (string messageContent)
+bool Axis2QpidSender::ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
- responseContent = "";
-
- bool sessionCreated = CreateSession ();
- if (sessionCreated)
+ bool status = false;
+
+ try
{
+ Connection connection;
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ Session session = connection.newSession();
+
/* Declare Private Queue */
- responseQueue << "client" << session.getId ().getName ();
+ string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX;
+ replyToQueueName.append(axutil_uuid_gen(env));
- session.queueDeclare (arg::queue = responseQueue.str ());
- session.exchangeBind (arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
- arg::queue = responseQueue.str (),
- arg::bindingKey = responseQueue.str ());
+ session.queueDeclare(arg::queue = replyToQueueName);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
+ arg::queue = replyToQueueName,
+ arg::bindingKey = replyToQueueName);
- dispatcher = new Dispatcher (session);
+ /* Create Message */
+ Message message;
+
+ message.getDeliveryProperties().setRoutingKey(toQueueName);
+ message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
- listen ();
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
- /* Create Message */
- Message message;
- message.getDeliveryProperties ().setRoutingKey (AXIS2_AMQP_RECEIVER_QUEUE_BIND_KEY);
- message.getMessageProperties ().setReplyTo (ReplyTo (AXIS2_AMQP_EXCHANGE_DIRECT, responseQueue.str ()));
- message.setData (messageContent);
+ if (mime_parts)
+ {
+ string mimeBody;
+ GetMimeBody(mime_parts, mimeBody);
+
+ messageContent.clear();/* MIME parts include SOAP envelop */
+
+ messageContent.append(mimeBody);
+ }
- async (session).messageTransfer (arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
+ message.setData(messageContent);
+
+ async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
- wait ();
+ connection.close();
- connection.close ();
+ status = true;
+ }
+ catch (const std::exception& e)
+ {
}
- return responseContent;
+ return status;
}
-void Axis2QpidSender::Send (string messageContent, string to)
+bool Axis2QpidSender::ClientSendDual(string messageContent, string toQueueName, string replyToQueueName,
+ bool isSOAP11, string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
- bool sessionCreated = CreateSession ();
- if (sessionCreated)
+ bool status = false;
+
+ try
{
+ Connection connection;
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ Session session = connection.newSession();
+
+ session.queueDeclare(arg::queue = replyToQueueName);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
+ arg::queue = replyToQueueName,
+ arg::bindingKey = replyToQueueName);
+
+ /* Create Message */
Message message;
- message.getDeliveryProperties ().setRoutingKey (to);
- message.setData (messageContent);
+ message.getDeliveryProperties().setRoutingKey(toQueueName);
+ message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
+
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
+
+ if (mime_parts)
+ {
+ string mimeBody;
+ GetMimeBody(mime_parts, mimeBody);
+
+ messageContent.clear();/* MIME parts include SOAP envelop */
+
+ messageContent.append(mimeBody);
+ }
+
+ message.setData(messageContent);
async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
- connection.close ();
+ connection.close();
+
+ status = true;
+ }
+ catch (const std::exception& e)
+ {
}
+
+ return status;
}
-void Axis2QpidSender::Send (string messageContent)
+bool Axis2QpidSender::ServerSend(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
- bool sessionCreated = CreateSession ();
- if (sessionCreated)
+ bool status = false;
+
+ try
{
- Message message;
+ Connection connection;
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ Session session = connection.newSession();
- message.setData (messageContent);
+ /* Create Message */
+ Message message;
- async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
+ message.getDeliveryProperties().setRoutingKey(toQueueName);
+
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
+ message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
- connection.close ();
- }
-}
+ if (mime_parts)
+ {
+ string mimeBody;
+ GetMimeBody(mime_parts, mimeBody);
+
+ messageContent.clear();/* MIME parts include SOAP envelop */
+ messageContent.append(mimeBody);
+ }
-void Axis2QpidSender::received (Message& message)
-{
- responseContent = message.getData ();
+ message.setData(messageContent);
- dispatcher->stop ();
-}
+ async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
+ connection.close();
-void Axis2QpidSender::listen (void)
-{
- if (!dispatcher)
- return;
+ status = true;
+ }
+ catch (const std::exception& e)
+ {
+ }
- session.messageSubscribe (arg::queue = responseQueue.str (), arg::destination = responseQueue.str ());
+ return status;
+}
- session.messageFlow (arg::destination = responseQueue.str (), arg::unit = MESSAGE_CREDIT, arg::value = 1);
- session.messageFlow (arg::destination = responseQueue.str (), arg::unit = BYTE_CREDIT, arg::value = UNLIMITED_CREDIT);
+void Axis2QpidSender::received(Message& message)
+{
+ responseContent = message.getData();
+ responseContentType = message.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
- dispatcher->listen (responseQueue.str (), this);
+ if (subscriptionManager)
+ subscriptionManager->cancel(message.getDestination());
}
-void Axis2QpidSender::wait (void)
+void Axis2QpidSender::GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody)
{
- if (!dispatcher)
+ int i = 0;
+ axiom_mime_part_t *mime_part = NULL;
+ axis2_status_t status = AXIS2_SUCCESS;
+
+ if (!mime_parts)
return;
- dispatcher->run ();
+ for (i = 0; i < axutil_array_list_size(mime_parts, env); i++)
+ {
+ mime_part = (axiom_mime_part_t *)axutil_array_list_get(mime_parts, env, i);
+
+ if (mime_part->type == AXIOM_MIME_PART_BUFFER)
+ {
+ mimeBody.append(mime_part->part, mime_part->part_size);
+ }
+ else if (mime_part->type == AXIOM_MIME_PART_FILE)
+ {
+ int length;
+ char* buffer;
+
+ ifstream file;
+ file.open(mime_part->file_name, ios::binary);
+
+ file.seekg(0, ios::end);
+ length = file.tellg();
+ file.seekg(0, ios::beg);
+
+ buffer = new char[length];
+
+ file.read(buffer, length);
+ file.close();
+
+ mimeBody.append(buffer, length);
+
+ delete [] buffer;
+ }
+ else
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Unknown mime type");
+ return;
+ }
+
+ if (status == AXIS2_FAILURE)
+ {
+ break;
+ }
+ }
}
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender.h Wed Nov 19 22:28:35 2008
@@ -19,10 +19,11 @@
#define AXIS2_QPID_SENDER_H
#include <qpid/client/Connection.h>
-#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <axis2_util.h>
#include <sstream>
#include <string>
@@ -35,31 +36,33 @@
class Axis2QpidSender : public MessageListener
{
public:
- Axis2QpidSender (string qpidBrokerIP,
- int qpidBrokerPort);
- ~Axis2QpidSender (void);
+ Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env);
+ ~Axis2QpidSender(void);
- /* Out-In */
- string Request (string messageContent); /* Used by Clients */
+ /* Client */
+ bool ClientSendReceive(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts);
+ bool ClientSendRobust(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts);
+ bool ClientSendDual(string messageContent, string toQueueName, string replyToQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts);
- /* Out Only */
- void Send (string messageContent, string to); /* Used by Server */
- void Send (string messageContent); /* Used by Clients */
+ /* Server */
+ bool ServerSend(string messageContent, string toQueueName, bool isSOAP11,
+ string contentType, string soapAction, axutil_array_list_t* mime_parts);
+
+ string responseContent;
+ string responseContentType;
private:
- bool CreateSession (void);
+ virtual void received(Message& message);
+
+ void GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody);
- virtual void received (Message& message);
- virtual void listen (void);
- virtual void wait (void);
-
- string qpidBrokerIP;
- int qpidBrokerPort;
- Connection connection;
- Dispatcher* dispatcher;
- Session session;
- string responseContent;
- stringstream responseQueue;
+ string qpidBrokerIP;
+ int qpidBrokerPort;
+ const axutil_env_t* env;
+ SubscriptionManager* subscriptionManager;
};
#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.cpp Wed Nov 19 22:28:35 2008
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+#include <axis2_amqp_util.h>
#include <axis2_qpid_sender.h>
#include <axis2_qpid_sender_interface.h>
@@ -23,45 +24,169 @@
{
#endif
-axis2_char_t* AXIS2_CALL
-axis2_qpid_client_request (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port,
- const axutil_env_t* env)
+AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
+axis2_qpid_client_send_receive(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx)
{
+ axis2_amqp_destination_info_t* destination_info = NULL;
+
+ destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
+
+ if (!destination_info || !destination_info->broker_ip ||
+ !destination_info->broker_port || !destination_info->queue_name)
+ {
+ return NULL;
+ }
+
+ axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+ axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
+
/* Get Response */
- Axis2QpidSender qpid_sender (qpid_broker_ip, qpid_broker_port);
- string response_content = qpid_sender.Request (request_content);
+ Axis2QpidSender qpid_sender(destination_info->broker_ip,
+ destination_info->broker_port, env);
+ bool status = qpid_sender.ClientSendReceive(request_content, destination_info->queue_name,
+ is_soap_11, content_type, soap_action, mime_parts);
+
+ AXIS2_FREE(env->allocator, destination_info->broker_ip);
+ AXIS2_FREE(env->allocator, destination_info->queue_name);
+ AXIS2_FREE(env->allocator, destination_info);
+
+ if (!status)
+ {
+ return NULL;
+ }
/* Create a Copy and Return */
- axis2_char_t* response_content_buffer = (axis2_char_t*)AXIS2_MALLOC (env->allocator,
- response_content.size () + 1);
- strcpy (response_content_buffer, response_content.c_str ());
+ axis2_amqp_binary_data_buffer_t* binary_data_buffer = (axis2_amqp_binary_data_buffer_t*)
+ AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_binary_data_buffer_t));
+
+ /* Data */
+ binary_data_buffer->data = AXIS2_MALLOC(env->allocator, qpid_sender.responseContent.size());
+ memcpy(binary_data_buffer->data, qpid_sender.responseContent.c_str(),
+ qpid_sender.responseContent.size());
+
+ /* Length */
+ binary_data_buffer->length = qpid_sender.responseContent.size();
+
+ /* ContentType */
+ binary_data_buffer->content_type = (axis2_char_t*)
+ AXIS2_MALLOC(env->allocator, qpid_sender.responseContentType.size() + 1);
+ strcpy(binary_data_buffer->content_type, qpid_sender.responseContentType.c_str());
+
+ return binary_data_buffer;
+}
+
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_client_send_robust(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx)
+{
+ axis2_amqp_destination_info_t* destination_info = NULL;
+ axis2_status_t status = AXIS2_FAILURE;
+
+ destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
+
+ if (!destination_info || !destination_info->broker_ip ||
+ !destination_info->broker_port || !destination_info->queue_name)
+ {
+ return AXIS2_FAILURE;
+ }
+
+ axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+ axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
- return response_content_buffer;
+ Axis2QpidSender qpid_sender(destination_info->broker_ip,
+ destination_info->broker_port, env);
+
+ status = qpid_sender.ClientSendRobust(request_content, destination_info->queue_name,
+ is_soap_11, content_type, soap_action, mime_parts);
+
+ AXIS2_FREE(env->allocator, destination_info->broker_ip);
+ AXIS2_FREE(env->allocator, destination_info->queue_name);
+ AXIS2_FREE(env->allocator, destination_info);
+
+ return status;
}
-void AXIS2_CALL
-axis2_qpid_server_send (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port,
- const axis2_char_t* to)
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_client_send_dual(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* reply_to_queue_name,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx)
{
- Axis2QpidSender qpid_sender (qpid_broker_ip, qpid_broker_port);
+ axis2_amqp_destination_info_t* destination_info = NULL;
+ axis2_status_t status = AXIS2_FAILURE;
+
+ destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
+
+ if (!destination_info || !destination_info->broker_ip ||
+ !destination_info->broker_port || !destination_info->queue_name)
+ {
+ return AXIS2_FAILURE;
+ }
+
+ axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+ axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
- qpid_sender.Send (request_content, to);
+ Axis2QpidSender qpid_sender(destination_info->broker_ip,
+ destination_info->broker_port, env);
+
+ status = qpid_sender.ClientSendDual(request_content, destination_info->queue_name,
+ reply_to_queue_name, is_soap_11, content_type, soap_action, mime_parts);
+
+ AXIS2_FREE(env->allocator, destination_info->broker_ip);
+ AXIS2_FREE(env->allocator, destination_info->queue_name);
+ AXIS2_FREE(env->allocator, destination_info);
+
+ return status;
}
-void AXIS2_CALL
-axis2_qpid_client_send (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port)
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_server_send(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx)
{
- Axis2QpidSender qpid_sender (qpid_broker_ip, qpid_broker_port);
-
- qpid_sender.Send (request_content);
+ axis2_amqp_destination_info_t* destination_info = NULL;
+ axis2_status_t status = AXIS2_FAILURE;
+
+ destination_info = axis2_amqp_util_msg_ctx_get_destination_info(msg_ctx, env);
+
+ if (!destination_info || !destination_info->broker_ip ||
+ !destination_info->broker_port || !destination_info->queue_name)
+ {
+ return AXIS2_FAILURE;
+ }
+
+ axis2_bool_t is_soap_11 = axis2_msg_ctx_get_is_soap_11(msg_ctx, env);
+ axutil_array_list_t* mime_parts = axis2_msg_ctx_get_mime_parts(msg_ctx, env);
+
+ Axis2QpidSender qpid_sender(destination_info->broker_ip,
+ destination_info->broker_port, env);
+
+ status = qpid_sender.ServerSend(request_content, destination_info->queue_name,
+ is_soap_11, content_type, soap_action, mime_parts);
+
+ AXIS2_FREE(env->allocator, destination_info->broker_ip);
+ AXIS2_FREE(env->allocator, destination_info->queue_name);
+ AXIS2_FREE(env->allocator, destination_info);
+
+ return status;
}
#ifdef __cplusplus
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/sender/qpid_sender/axis2_qpid_sender_interface.h Wed Nov 19 22:28:35 2008
@@ -20,28 +20,45 @@
#include <axis2_util.h>
#include <axis2_conf_init.h>
+#include <axis2_amqp_util.h>
#ifdef __cplusplus
extern "C"
{
#endif
-axis2_char_t* AXIS2_CALL
-axis2_qpid_client_request (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port,
- const axutil_env_t* env);
-
-void AXIS2_CALL
-axis2_qpid_server_send (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port,
- const axis2_char_t* to);
-
-void AXIS2_CALL
-axis2_qpid_client_send (const axis2_char_t* request_content,
- const axis2_char_t* qpid_broker_ip,
- int qpid_broker_port);
+AXIS2_EXTERN axis2_amqp_binary_data_buffer_t* AXIS2_CALL
+axis2_qpid_client_send_receive(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx);
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_client_send_robust(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx);
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_client_send_dual(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* reply_to_queue_name,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx);
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_server_send(
+ const axis2_char_t* request_content,
+ const axutil_env_t* env,
+ const axis2_char_t* content_type,
+ const axis2_char_t* soap_action,
+ axis2_msg_ctx_t* msg_ctx);
#ifdef __cplusplus
}
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/Makefile.am?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/Makefile.am (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/Makefile.am Wed Nov 19 22:28:35 2008
@@ -17,6 +17,7 @@
$(top_builddir)/src/core/engine/libaxis2_engine.la \
$(top_builddir)/neethi/src/libneethi.la \
$(top_builddir)/src/core/transport/amqp/receiver/libaxis2_amqp_receiver.la \
+ $(top_builddir)/src/core/transport/amqp/util/libaxis2_amqp_util.la \
-lpthread
INCLUDES = -I$(top_builddir)/include \
@@ -27,5 +28,6 @@
-I$(top_builddir)/src/core/engine \
-I$(top_builddir)/src/core/transport/amqp/receiver \
-I$(top_builddir)/src/core/transport/amqp/receiver/qpid_receiver \
+ -I$(top_builddir)/src/core/transport/amqp/util \
-I$(top_builddir)/util/include \
-I$(top_builddir)/axiom/include
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.c Wed Nov 19 22:28:35 2008
@@ -29,100 +29,96 @@
#include <axis2_qpid_receiver_interface.h>
axis2_transport_receiver_t* receiver = NULL;
-axutil_env_t* server_env = NULL;
+axutil_env_t* server_env = NULL;
axutil_env_t*
-init_server_env (axutil_allocator_t* allocator,
- const axis2_char_t* log_file_name)
+init_server_env(axutil_allocator_t* allocator,
+ const axis2_char_t* log_file_name)
{
- axutil_error_t* error = axutil_error_create (allocator);
- axutil_log_t* log = axutil_log_create (allocator, NULL, log_file_name);
- axutil_thread_pool_t* thread_pool = axutil_thread_pool_init (allocator);
- axutil_env_t* env = axutil_env_create_with_error_log_thread_pool (allocator,
- error,
- log,
- thread_pool);
+ axutil_error_t* error = axutil_error_create(allocator);
+ axutil_log_t* log = axutil_log_create(allocator, NULL, log_file_name);
+ axutil_thread_pool_t* thread_pool = axutil_thread_pool_init(allocator);
+ axutil_env_t* env = axutil_env_create_with_error_log_thread_pool(allocator,
+ error, log, thread_pool);
- axiom_xml_reader_init ();
+ axiom_xml_reader_init();
return env;
}
void
-server_exit (int status)
+server_exit(int status)
{
if (receiver)
{
- axis2_transport_receiver_free (receiver, server_env);
+ axis2_transport_receiver_free(receiver, server_env);
}
if (server_env)
{
- axutil_env_free (server_env);
+ axutil_env_free(server_env);
}
- axiom_xml_reader_cleanup ();
+ axiom_xml_reader_cleanup();
- exit (status);
+ exit(status);
}
void
-show_usage (axis2_char_t* prog_name)
+show_usage(axis2_char_t* prog_name)
{
- fprintf (stdout, "\n Usage : %s", prog_name);
- fprintf (stdout, " [-i QPID_BROKER_IP]");
- fprintf (stdout, " [-p QPID_BROKER_PORT]");
- fprintf (stdout, " [-r REPO_PATH]");
- fprintf (stdout, " [-l LOG_LEVEL]");
- fprintf (stdout, " [-f LOG_FILE]\n");
- fprintf (stdout, " [-s LOG_FILE_SIZE]\n");
- fprintf (stdout, " Options :\n");
- fprintf (stdout, "\t-i QPID_BROKER_IP \t Qpid broker IP, default is 127.0.0.1\n");
- fprintf (stdout, "\t-p QPID_BROKER_PORT \t the port on which the Qpid broker listens, default is 5672\n");
- fprintf (stdout, "\t-r REPO_PATH \t\t repository path, default is ../\n");
- fprintf (stdout,
- "\t-l LOG_LEVEL\t\t log level, available log levels:"
- "\n\t\t\t\t\t 0 - critical 1 - errors 2 - warnings"
- "\n\t\t\t\t\t 3 - information 4 - debug 5- user 6 - trace"
- "\n\t\t\t\t\t Default log level is 4(debug).\n");
+ fprintf(stdout, "\n Usage : %s", prog_name);
+ fprintf(stdout, " [-i QPID_BROKER_IP]");
+ fprintf(stdout, " [-p QPID_BROKER_PORT]");
+ fprintf(stdout, " [-r REPO_PATH]");
+ fprintf(stdout, " [-l LOG_LEVEL]");
+ fprintf(stdout, " [-f LOG_FILE]\n");
+ fprintf(stdout, " [-s LOG_FILE_SIZE]\n");
+ fprintf(stdout, " Options :\n");
+ fprintf(stdout, "\t-i QPID_BROKER_IP \t Qpid broker IP, default is 127.0.0.1\n");
+ fprintf(stdout, "\t-p QPID_BROKER_PORT \t the port on which the Qpid broker listens, default is 5672\n");
+ fprintf(stdout, "\t-r REPO_PATH \t\t repository path, default is ../\n");
+ fprintf(stdout,
+ "\t-l LOG_LEVEL\t\t log level, available log levels:"
+ "\n\t\t\t\t\t 0 - critical 1 - errors 2 - warnings"
+ "\n\t\t\t\t\t 3 - information 4 - debug 5- user 6 - trace"
+ "\n\t\t\t\t\t Default log level is 4(debug).\n");
#ifndef WIN32
- fprintf (stdout,
- "\t-f LOG_FILE\t\t log file, default is $AXIS2C_HOME/logs/axis2.log"
- "\n\t\t\t\t or axis2.log in current folder if AXIS2C_HOME not set\n");
+ fprintf(stdout,
+ "\t-f LOG_FILE\t\t log file, default is $AXIS2C_HOME/logs/axis2.log"
+ "\n\t\t\t\t or axis2.log in current folder if AXIS2C_HOME not set\n");
#else
- fprintf (stdout,
- "\t-f LOG_FILE\t\t log file, default is %%AXIS2C_HOME%%\\logs\\axis2.log"
- "\n\t\t\t\t or axis2.log in current folder if AXIS2C_HOME not set\n");
+ fprintf(stdout,
+ "\t-f LOG_FILE\t\t log file, default is %%AXIS2C_HOME%%\\logs\\axis2.log"
+ "\n\t\t\t\t or axis2.log in current folder if AXIS2C_HOME not set\n");
#endif
- fprintf (stdout,
- "\t-s LOG_FILE_SIZE\t Maximum log file size in mega bytes, default maximum size is 1MB.\n");
- fprintf (stdout, " Help :\n\t-h \t\t\t display this help screen.\n\n");
+ fprintf(stdout,
+ "\t-s LOG_FILE_SIZE\t Maximum log file size in mega bytes, default maximum size is 1MB.\n");
+ fprintf(stdout, " Help :\n\t-h \t\t\t display this help screen.\n\n");
}
#ifndef WIN32
void
-sig_handler (int signal)
+sig_handler(int signal)
{
switch (signal)
{
case SIGINT:
- AXIS2_LOG_INFO (server_env->log, "Received signal SIGINT. Server "
- "shutting down");
- axis2_amqp_receiver_stop (receiver, server_env);
- AXIS2_LOG_INFO (server_env->log, "Shutdown complete ...");
+ AXIS2_LOG_INFO(server_env->log, "Received signal SIGINT.Server shutting down");
+ axis2_amqp_receiver_stop(receiver, server_env);
+ AXIS2_LOG_INFO(server_env->log, "Shutdown complete ...");
- server_exit (0);
+ server_exit(0);
case SIGPIPE:
- AXIS2_LOG_INFO (server_env->log, "Received signal SIGPIPE. Client "
- "request serve aborted");
+ AXIS2_LOG_INFO(server_env->log, "Received signal SIGPIPE.Client request serve aborted");
return;
case SIGSEGV:
- fprintf (stderr, "Received deadly signal SIGSEGV. Terminating ...\n");
+ fprintf(stderr, "Received deadly signal SIGSEGV. Terminating ...\n");
_exit(-1);
}
}
@@ -130,21 +126,20 @@
#endif
int
-main (int argc,
- char* argv[])
+main(int argc, char* argv[])
{
axutil_allocator_t* allocator = NULL;
extern char* optarg;
extern int optopt;
int c;
const axis2_char_t* qpid_broker_ip = NULL;
- int qpid_broker_port = -1;
+ int qpid_broker_port = AXIS2_QPID_NULL_BROKER_PORT;
const axis2_char_t* repo_path = AXIS2_AMQP_SERVER_REPO_PATH;
axutil_log_levels_t log_level = AXIS2_LOG_LEVEL_DEBUG;
const axis2_char_t* log_file_name = AXIS2_AMQP_SERVER_LOG_FILE_NAME;
int log_file_size = AXUTIL_LOG_FILE_SIZE;
- while ((c = AXIS2_GETOPT (argc, argv, "i:p:r:l:f:s:h")) != -1)
+ while ((c = AXIS2_GETOPT(argc, argv, "i:p:r:l:f:s:h")) != -1)
{
switch (c)
{
@@ -153,7 +148,7 @@
break;
case 'p':
- qpid_broker_port = AXIS2_ATOI (optarg);
+ qpid_broker_port = AXIS2_ATOI(optarg);
break;
case 'r':
@@ -161,7 +156,7 @@
break;
case 'l':
- log_level = AXIS2_ATOI (optarg);
+ log_level = AXIS2_ATOI(optarg);
if (log_level < AXIS2_LOG_LEVEL_CRITICAL)
log_level = AXIS2_LOG_LEVEL_CRITICAL;
if (log_level > AXIS2_LOG_LEVEL_TRACE)
@@ -173,66 +168,64 @@
break;
case 's':
- log_file_size = 1024 * 1024 * AXIS2_ATOI (optarg);
+ log_file_size = 1024 * 1024 * AXIS2_ATOI(optarg);
break;
case 'h':
- show_usage (argv[0]);
+ show_usage(argv[0]);
return 0;
case ':':
- fprintf (stderr, "\nOption -%c requires an operand\n", optopt);
- show_usage (argv[0]);
+ fprintf(stderr, "\nOption -%c requires an operand\n", optopt);
+ show_usage(argv[0]);
return -1;
case '?':
- if (isprint (optopt))
- fprintf (stderr, "\nUnknown option `-%c'.\n", optopt);
- show_usage (argv[0]);
+ if (isprint(optopt))
+ fprintf(stderr, "\nUnknown option `-%c'.\n", optopt);
+ show_usage(argv[0]);
return -1;
}
}
- allocator = axutil_allocator_init (NULL);
+ allocator = axutil_allocator_init(NULL);
if (!allocator)
{
- server_exit (-1);
+ server_exit(-1);
}
- server_env = init_server_env (allocator, log_file_name);
+ server_env = init_server_env(allocator, log_file_name);
server_env->log->level = log_level;
server_env->log->size = log_file_size;
- axutil_error_init ();
+ axutil_error_init();
#ifndef WIN32
- signal (SIGINT, sig_handler);
- signal (SIGPIPE, sig_handler);
+ signal(SIGINT, sig_handler);
+ signal(SIGPIPE, sig_handler);
#endif
- AXIS2_LOG_INFO (server_env->log, "Starting Axis2 AMQP Server ...");
- AXIS2_LOG_INFO (server_env->log, "Repo Location : %s", repo_path);
+ AXIS2_LOG_INFO(server_env->log, "Starting Axis2 AMQP Server ...");
+ AXIS2_LOG_INFO(server_env->log, "Repo Location : %s", repo_path);
- receiver = axis2_amqp_receiver_create (server_env,
- repo_path,
- qpid_broker_ip,
- qpid_broker_port);
+ receiver = axis2_amqp_receiver_create(server_env, repo_path,
+ qpid_broker_ip, qpid_broker_port);
if (!receiver)
{
- AXIS2_LOG_ERROR (server_env->log, AXIS2_LOG_SI,
- "Server creation failed: Error code:" " %d :: %s",
- server_env->error->error_number,
- AXIS2_ERROR_GET_MESSAGE (server_env->error));
- server_exit (-1);
+ AXIS2_LOG_ERROR(server_env->log, AXIS2_LOG_SI,
+ "Server creation failed: Error code:" " %d :: %s",
+ server_env->error->error_number,
+ AXIS2_ERROR_GET_MESSAGE(server_env->error));
+ server_exit(-1);
}
- if (axis2_amqp_receiver_start (receiver, server_env) == AXIS2_FAILURE)
- {
- AXIS2_LOG_ERROR (server_env->log, AXIS2_LOG_SI,
- "Server start failed: Error code:" " %d :: %s",
- server_env->error->error_number,
- AXIS2_ERROR_GET_MESSAGE (server_env->error));
- server_exit (-1);
+ if (axis2_amqp_receiver_start(receiver, server_env) == AXIS2_FAILURE)
+ {
+ AXIS2_LOG_ERROR(server_env->log, AXIS2_LOG_SI,
+ "Server start failed: Error code:" " %d :: %s",
+ server_env->error->error_number,
+ AXIS2_ERROR_GET_MESSAGE(server_env->error));
+ server_exit(-1);
}
return 0;
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/server/axis2_amqp_server/axis2_amqp_server.h Wed Nov 19 22:28:35 2008
@@ -18,26 +18,24 @@
#ifndef AXIS2_AMQP_SERVER_H
#define AXIS2_AMQP_SERVER_H
+#include <axis2_amqp_defines.h>
#include <platforms/axutil_platform_auto_sense.h>
axutil_env_t*
-init_server_env (axutil_allocator_t* allocator,
- const axis2_char_t* log_file_name);
+init_server_env(axutil_allocator_t* allocator,
+ const axis2_char_t* log_file_name);
void
-server_exit (int status);
+server_exit(int status);
void
-show_usage (axis2_char_t* prog_name);
+show_usage(axis2_char_t* prog_name);
#ifndef WIN32
void
-sig_handler (int signal);
+sig_handler(int signal);
#endif
-#define AXIS2_AMQP_SERVER_LOG_FILE_NAME "axis2_amqp_server.log"
-#define AXIS2_AMQP_SERVER_REPO_PATH "../"
-
#endif
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/Makefile.am?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/Makefile.am (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/Makefile.am Wed Nov 19 22:28:35 2008
@@ -1,19 +1,18 @@
lib_LTLIBRARIES = libaxis2_amqp_util.la
+
+
libaxis2_amqp_util_la_SOURCES = axis2_amqp_util.c
+
libaxis2_amqp_util_la_LIBADD = $(top_builddir)/util/src/libaxutil.la
-libaxis2_amqp_util_la_SOURCES = axis2_amqp_util.c
-
-libaxis2_amqp_util_la_LIBADD = $(top_builddir)/util/src/libaxutil.la
-
-libaxis2_amqp_util_la_LDFLAGS = -version-info $(VERSION_NO)
-
-INCLUDES = -I$(top_builddir)/include \
- -I$(top_builddir)/src/core/transport/amqp/util \
- -I$(top_builddir)/src/core/description \
- -I$(top_builddir)/src/core/context \
- -I$(top_builddir)/src/core/engine \
- -I$(top_builddir)/src/core/deployment \
- -I$(top_builddir)/util/include \
- -I$(top_builddir)/axiom/include
+
libaxis2_amqp_util_la_LDFLAGS = -version-info $(VERSION_NO)
+
INCLUDES = -I$(top_builddir)/include \
+
-I$(top_builddir)/src/core/transport/amqp/util \
+
-I$(top_builddir)/src/core/transport/amqp/sender/qpid_sender \
+
-I$(top_builddir)/src/core/description \
+
-I$(top_builddir)/src/core/context \
+
-I$(top_builddir)/src/core/engine \
+
-I$(top_builddir)/src/core/deployment \
+
-I$(top_builddir)/util/include \
+
-I$(top_builddir)/axiom/include
Modified: webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h?rev=719179&r1=719178&r2=719179&view=diff
==============================================================================
--- webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h (original)
+++ webservices/axis2/trunk/c/src/core/transport/amqp/util/axis2_amqp_defines.h Wed Nov 19 22:28:35 2008
@@ -18,10 +18,43 @@
#ifndef AXIS2_AMQP_DEFINES_H
#define AXIS2_AMQP_DEFINES_H
+#include <axiom_mime_const.h>
+
#define AXIS2_AMQP_EXCHANGE_DIRECT "amq.direct"
-#define AXIS2_AMQP_RECEIVER_QUEUE_BIND_KEY "axis2.amqp.receiver.queue.bind.key"
-#define AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO "reply_to"
-#define AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_IP "qpid_broker_ip"
-#define AXIS2_AMQP_MSG_CTX_PROPERTY_QPID_BROKER_PORT "qpid_broker_port"
+
+#define AXIS2_AMQP_CONF_QPID_BROKER_IP "qpid_broker_ip"
+#define AXIS2_AMQP_CONF_QPID_BROKER_PORT "qpid_broker_port"
+
+#define AXIS2_QPID_DEFAULT_BROKER_IP "127.0.0.1"
+#define AXIS2_QPID_DEFAULT_BROKER_PORT 5672
+#define AXIS2_QPID_NULL_BROKER_PORT 0
+
+#define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP "qpid_broker_ip"
+#define AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT "qpid_broker_port"
+#define AXIS2_AMQP_CONF_CTX_PROPERTY_QUEUE_NAME "queue_name"
+
+#define AXIS2_AMQP_MSG_CTX_PROPERTY_REPLY_TO "qpid_reply_to"
+
+#define AXIS2_AMQP_HEADER_ACCEPT_TEXT_XML "text/xml"
+#define AXIS2_AMQP_HEADER_ACCEPT_APPL_SOAP "application/soap+xml"
+#define AXIS2_AMQP_HEADER_ACCEPT_MULTIPART_RELATED AXIOM_MIME_TYPE_MULTIPART_RELATED
+#define AXIS2_AMQP_HEADER_CONTENT_TYPE_MIME_BOUNDARY "boundary"
+#define AXIS2_AMQP_HEADER_SOAP_ACTION "SOAPAction"
+#define AXIS2_AMQP_HEADER_CONTENT_TYPE "Content-Type"
+
+#define AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX "TempQueue"
+
+#define AXIS2_AMQP_SERVER_LOG_FILE_NAME "axis2_amqp_server.log"
+#define AXIS2_AMQP_SERVER_REPO_PATH "../"
+
+#define AXIS2_AMQP_EPR_PREFIX "amqp:"
+#define AXIS2_AMQP_EPR_SERVICE_PREFIX "services"
+#define AXIS2_AMQP_EPR_ANON_SERVICE_NAME "__ANONYMOUS_SERVICE__"
+
+#define AXIS2_AMQP_EQ '='
+#define AXIS2_AMQP_SEMI_COLON ';'
+#define AXIS2_AMQP_ESC_NULL '\0'
+#define AXIS2_AMQP_DOUBLE_QUOTE '"'
+#define AXIS2_AMQP_B_SLASH '\\'
#endif