You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2014/02/05 22:58:24 UTC
svn commit: r1564952 [2/4] - in /cxf/trunk: parent/
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ rt/transp...
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.security.SecurityContext;
+import org.apache.cxf.transport.jms.spec.JMSSpecConstants;
+import org.apache.cxf.transport.jms.uri.JMSEndpoint;
+import org.apache.cxf.transport.jms.uri.JMSEndpointParser;
+import org.apache.cxf.transport.jms.util.JMSMessageConverter;
+
+public final class JMSMessageUtils {
+ private static final Logger LOG = LogUtils.getL7dLogger(JMSMessageUtils.class);
+
+ private JMSMessageUtils() {
+
+ }
+
+ /**
+ * Create a JMS of the appropriate type populated with the given payload.
+ *
+ * @param payload the message payload, expected to be either of type String or byte[] depending on payload
+ * type
+ * @param session the JMS session
+ * @param replyTo the ReplyTo destination if any
+ * @return a JMS of the appropriate type populated with the given payload
+ */
+ static Message createAndSetPayload(Object payload, Session session, String messageType)
+ throws JMSException {
+ Message message = null;
+ if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
+ message = session.createTextMessage((String)payload);
+ } else if (JMSConstants.BYTE_MESSAGE_TYPE.equals(messageType)) {
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes((byte[])payload);
+ } else {
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject((byte[])payload);
+ }
+ return message;
+ }
+
+ public static org.apache.cxf.message.Message asCXFMessage(Message message, String headerType)
+ throws UnsupportedEncodingException, JMSException {
+
+ org.apache.cxf.message.Message inMessage = new MessageImpl();
+ populateIncomingContext(message, inMessage, headerType);
+ retrieveAndSetPayload(inMessage, message);
+ return inMessage;
+ }
+
+ /**
+ * Extract the payload of an incoming JMS message
+ *
+ * @param inMessage
+ * @param message the incoming message
+ * @throws UnsupportedEncodingException
+ * @throws JMSException
+ */
+ private static void retrieveAndSetPayload(org.apache.cxf.message.Message inMessage, Message message)
+ throws UnsupportedEncodingException, JMSException {
+ String messageType = null;
+ Object converted = new JMSMessageConverter().fromMessage(message);
+ if (converted instanceof String) {
+ inMessage.setContent(Reader.class, new StringReader((String)converted));
+ messageType = JMSConstants.TEXT_MESSAGE_TYPE;
+ } else if (converted instanceof byte[]) {
+ inMessage.setContent(InputStream.class, new ByteArrayInputStream((byte[])converted));
+ messageType = JMSConstants.BYTE_MESSAGE_TYPE;
+ } else {
+ messageType = "unknown";
+ }
+ Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)inMessage
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ if (headers == null) {
+ headers = new TreeMap<String, List<String>>();
+ inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
+ }
+ headers.put(JMSSpecConstants.JMS_MESSAGE_TYPE, Collections.singletonList(messageType));
+ }
+
+ private static void populateIncomingContext(javax.jms.Message message,
+ org.apache.cxf.message.Message inMessage, String messageType)
+ throws UnsupportedEncodingException, JMSException {
+ JMSMessageHeadersType messageProperties = null;
+ messageProperties = (JMSMessageHeadersType)inMessage.get(messageType);
+ if (messageProperties == null) {
+ messageProperties = new JMSMessageHeadersType();
+ inMessage.put(messageType, messageProperties);
+ }
+ messageProperties.setJMSCorrelationID(message.getJMSCorrelationID());
+ messageProperties.setJMSDeliveryMode(Integer.valueOf(message.getJMSDeliveryMode()));
+ messageProperties.setJMSExpiration(Long.valueOf(message.getJMSExpiration()));
+ messageProperties.setJMSMessageID(message.getJMSMessageID());
+ messageProperties.setJMSPriority(Integer.valueOf(message.getJMSPriority()));
+ messageProperties.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
+ messageProperties.setJMSTimeStamp(Long.valueOf(message.getJMSTimestamp()));
+ messageProperties.setJMSType(message.getJMSType());
+
+ if (message.getJMSReplyTo() != null) {
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo instanceof Queue) {
+ messageProperties.setJMSReplyTo(((Queue)replyTo).getQueueName());
+ } else if (replyTo instanceof Topic) {
+ messageProperties.setJMSReplyTo(((Topic)replyTo).getTopicName());
+ }
+ }
+
+ Map<String, List<String>> protHeaders = new TreeMap<String, List<String>>();
+ List<JMSPropertyType> props = messageProperties.getProperty();
+ Enumeration<String> enm = CastUtils.cast(message.getPropertyNames());
+ while (enm.hasMoreElements()) {
+ String name = enm.nextElement();
+ String val = message.getStringProperty(name);
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(name);
+ prop.setValue(val);
+ props.add(prop);
+
+ protHeaders.put(name, Collections.singletonList(val));
+ if (name.equals(org.apache.cxf.message.Message.CONTENT_TYPE)
+ || name.equals(JMSConstants.JMS_CONTENT_TYPE) && val != null) {
+ inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, val);
+ // set the message encoding
+ inMessage.put(org.apache.cxf.message.Message.ENCODING, getEncoding(val));
+ }
+ if (name.equals(org.apache.cxf.message.Message.RESPONSE_CODE)) {
+ inMessage.getExchange().put(org.apache.cxf.message.Message.RESPONSE_CODE,
+ Integer.valueOf(val));
+ }
+ }
+ inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
+
+ populateIncomingMessageProperties(message, inMessage, messageProperties);
+
+ }
+
+ /**
+ * @param jmsMessage
+ * @param inMessage
+ * @param messagePropertiesType
+ * @throws UnsupportedEncodingException
+ * @throws JMSException
+ */
+ private static void populateIncomingMessageProperties(Message jmsMessage,
+ org.apache.cxf.message.Message inMessage,
+ JMSMessageHeadersType messageProperties)
+ throws UnsupportedEncodingException, JMSException {
+ if (jmsMessage.propertyExists(JMSSpecConstants.TARGETSERVICE_FIELD)) {
+ messageProperties.setSOAPJMSTargetService(jmsMessage
+ .getStringProperty(JMSSpecConstants.TARGETSERVICE_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.BINDINGVERSION_FIELD)) {
+ messageProperties.setSOAPJMSBindingVersion(jmsMessage
+ .getStringProperty(JMSSpecConstants.BINDINGVERSION_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.CONTENTTYPE_FIELD)) {
+ messageProperties.setSOAPJMSContentType(jmsMessage
+ .getStringProperty(JMSSpecConstants.CONTENTTYPE_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.CONTENTENCODING_FIELD)) {
+ messageProperties.setSOAPJMSContentEncoding(jmsMessage
+ .getStringProperty(JMSSpecConstants.CONTENTENCODING_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.SOAPACTION_FIELD)) {
+ messageProperties.setSOAPJMSSOAPAction(jmsMessage
+ .getStringProperty(JMSSpecConstants.SOAPACTION_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.ISFAULT_FIELD)) {
+ messageProperties
+ .setSOAPJMSIsFault(jmsMessage.getBooleanProperty(JMSSpecConstants.ISFAULT_FIELD));
+ }
+ if (jmsMessage.propertyExists(JMSSpecConstants.REQUESTURI_FIELD)) {
+ messageProperties.setSOAPJMSRequestURI(jmsMessage
+ .getStringProperty(JMSSpecConstants.REQUESTURI_FIELD));
+
+ Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)inMessage
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ if (headers == null) {
+ headers = new TreeMap<String, List<String>>();
+ inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
+ }
+ try {
+ JMSEndpoint endpoint = JMSEndpointParser.createEndpoint(jmsMessage
+ .getStringProperty(JMSSpecConstants.REQUESTURI_FIELD));
+ if (endpoint.getParameter(JMSSpecConstants.TARGETSERVICE_PARAMETER_NAME) != null) {
+ headers.put(JMSSpecConstants.TARGET_SERVICE_IN_REQUESTURI,
+ Collections.singletonList("true"));
+ }
+ } catch (Exception e) {
+ headers.put(JMSSpecConstants.MALFORMED_REQUESTURI, Collections.singletonList("true"));
+ }
+ }
+
+ if (messageProperties.isSetSOAPJMSContentType()) {
+ String contentType = messageProperties.getSOAPJMSContentType();
+ inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, contentType);
+ // set the message encoding
+ inMessage.put(org.apache.cxf.message.Message.ENCODING, getEncoding(contentType));
+ }
+
+ }
+
+ /**
+ * Extract the property JMSXUserID or JMS_TIBCO_SENDER from the jms message and
+ * create a SecurityContext from it.
+ * For more info see Jira Issue CXF-2055
+ * {@link https://issues.apache.org/jira/browse/CXF-2055}
+ *
+ * @param message jms message to retrieve user information from
+ * @return SecurityContext that contains the user of the producer of the message as the Principal
+ * @throws JMSException if something goes wrong
+ */
+ public static SecurityContext buildSecurityContext(javax.jms.Message message,
+ JMSConfiguration config) throws JMSException {
+ String tempUserName = message.getStringProperty("JMSXUserID");
+ if (tempUserName == null && config.isJmsProviderTibcoEms()) {
+ tempUserName = message.getStringProperty("JMS_TIBCO_SENDER");
+ }
+ if (tempUserName == null) {
+ return null;
+ }
+ final String jmsUserName = tempUserName;
+
+ final Principal principal = new Principal() {
+ public String getName() {
+ return jmsUserName;
+ }
+
+ };
+
+ SecurityContext securityContext = new SecurityContext() {
+
+ public Principal getUserPrincipal() {
+ return principal;
+ }
+
+ public boolean isUserInRole(String role) {
+ return false;
+ }
+
+ };
+ return securityContext;
+ }
+
+ static String getEncoding(String ct) throws UnsupportedEncodingException {
+ String contentType = ct.toLowerCase();
+ String enc = null;
+
+ String[] tokens = StringUtils.split(contentType, ";");
+ for (String token : tokens) {
+ int index = token.indexOf("charset=");
+ if (index >= 0) {
+ enc = token.substring(index + 8);
+ break;
+ }
+ }
+
+ String normalizedEncoding = HttpHeaderHelper.mapCharset(enc, "UTF-8");
+ if (normalizedEncoding == null) {
+ String m = new org.apache.cxf.common.i18n.Message("INVALID_ENCODING_MSG", LOG, new Object[] {
+ enc
+ }).toString();
+ LOG.log(Level.WARNING, m);
+ throw new UnsupportedEncodingException(m);
+ }
+
+ return normalizedEncoding;
+ }
+
+ private static String getContentType(org.apache.cxf.message.Message message) {
+ String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+ String enc = (String)message.get(org.apache.cxf.message.Message.ENCODING);
+ // add the encoding information
+ if (null != contentType) {
+ if (enc != null && contentType.indexOf("charset=") == -1
+ && !contentType.toLowerCase().contains("multipart/related")) {
+ contentType = contentType + "; charset=" + enc;
+ }
+ } else if (enc != null) {
+ contentType = "text/xml; charset=" + enc;
+ } else {
+ contentType = "text/xml";
+ }
+
+ // Retrieve or create protocol headers
+ Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)message
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ if (null == headers) {
+ headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
+ message.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
+ }
+ return contentType;
+ }
+
+ private static String getContentEncoding(org.apache.cxf.message.Message message) {
+ Map<String, List<String>> headers
+ = CastUtils.cast((Map<?, ?>)message.get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ if (headers != null) {
+ List<String> l = headers.get("Content-Encoding");
+ if (l != null && !l.isEmpty()) {
+ return l.get(0);
+ }
+ }
+ return null;
+ }
+
+ public static Message asJMSMessage(JMSConfiguration jmsConfig,
+ org.apache.cxf.message.Message outMessage,
+ Object payload,
+ String messageType,
+ Session session,
+ String correlationId,
+ String headerType)
+ throws JMSException {
+
+ Message jmsMessage = JMSMessageUtils.createAndSetPayload(payload, session, messageType);
+ JMSMessageHeadersType messageProperties = getOrCreateHeader(outMessage, headerType);
+ JMSMessageUtils.prepareJMSMessageHeaderProperties(messageProperties, outMessage, jmsConfig);
+ JMSMessageUtils.prepareJMSMessageProperties(messageProperties, outMessage,
+ jmsConfig.getTargetService(), jmsConfig.getRequestURI());
+ JMSMessageUtils.setJMSMessageProperties(jmsMessage, messageProperties);
+ jmsMessage.setJMSCorrelationID(correlationId);
+ return jmsMessage;
+ }
+
+ private static JMSMessageHeadersType getOrCreateHeader(org.apache.cxf.message.Message message,
+ String headerName) {
+ JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)message
+ .get(headerName);
+ if (messageProperties == null) {
+ messageProperties = new JMSMessageHeadersType();
+ message.put(headerName, messageProperties);
+ }
+ return messageProperties;
+ }
+
+ /**
+ * @param jmsMessage
+ * @param messageProperties
+ */
+ static void setJMSMessageProperties(Message jmsMessage, JMSMessageHeadersType messageProperties)
+ throws JMSException {
+
+ if (messageProperties == null) {
+ return;
+ }
+
+ setProp(jmsMessage, JMSSpecConstants.TARGETSERVICE_FIELD, messageProperties.getSOAPJMSTargetService());
+ setProp(jmsMessage, JMSSpecConstants.BINDINGVERSION_FIELD, messageProperties.getSOAPJMSBindingVersion());
+ setProp(jmsMessage, JMSSpecConstants.CONTENTTYPE_FIELD, messageProperties.getSOAPJMSContentType());
+ setProp(jmsMessage, JMSSpecConstants.CONTENTENCODING_FIELD, messageProperties.getSOAPJMSContentEncoding());
+ setProp(jmsMessage, JMSSpecConstants.SOAPACTION_FIELD, messageProperties.getSOAPJMSSOAPAction());
+ setProp(jmsMessage, JMSSpecConstants.REQUESTURI_FIELD, messageProperties.getSOAPJMSRequestURI());
+
+ if (messageProperties.isSetSOAPJMSIsFault()) {
+ jmsMessage.setBooleanProperty(JMSSpecConstants.ISFAULT_FIELD, messageProperties
+ .isSOAPJMSIsFault());
+ }
+
+ if (messageProperties.isSetProperty()) {
+ for (JMSPropertyType prop : messageProperties.getProperty()) {
+ jmsMessage.setStringProperty(prop.getName(), prop.getValue());
+ }
+ }
+ }
+
+ private static void setProp(Message jmsMessage, String name, String value) throws JMSException {
+ if (value != null) {
+ jmsMessage.setStringProperty(name, value);
+ }
+ }
+
+ /**
+ * @param messageProperteis
+ * @param outMessage
+ * @param jmsConfig
+ */
+ private static void prepareJMSMessageHeaderProperties(
+ JMSMessageHeadersType messageProperteis,
+ org.apache.cxf.message.Message outMessage,
+ JMSConfiguration jmsConfig) {
+ if (!messageProperteis.isSetJMSDeliveryMode()) {
+ messageProperteis.setJMSDeliveryMode(jmsConfig.getDeliveryMode());
+ }
+ if (!messageProperteis.isSetTimeToLive()) {
+ messageProperteis.setTimeToLive(jmsConfig.getTimeToLive());
+ }
+ if (!messageProperteis.isSetJMSPriority()) {
+ messageProperteis.setJMSPriority(jmsConfig.getPriority());
+ }
+ }
+
+ /**
+ * @param messageProperties
+ * @param outMessage
+ * @param jmsConfig
+ * @param targetService TODO
+ * @param requestURI TODO
+ */
+ private static void prepareJMSMessageProperties(JMSMessageHeadersType messageProperties,
+ org.apache.cxf.message.Message outMessage,
+ String targetService,
+ String requestURI) {
+
+ // Retrieve or create protocol headers
+ Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)outMessage
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+
+ boolean isSoapMessage =
+ !MessageUtils.isTrue(outMessage.getExchange().get(org.apache.cxf.message.Message.REST_MESSAGE));
+
+ if (isSoapMessage) {
+ if (!messageProperties.isSetSOAPJMSTargetService()) {
+ messageProperties.setSOAPJMSTargetService(targetService);
+ }
+ if (!messageProperties.isSetSOAPJMSBindingVersion()) {
+ messageProperties.setSOAPJMSBindingVersion("1.0");
+ }
+ messageProperties.setSOAPJMSContentType(getContentType(outMessage));
+ if (getContentEncoding(outMessage) != null) {
+ messageProperties.setSOAPJMSContentEncoding(getContentEncoding(outMessage));
+ }
+ String soapAction = getSoapAction(messageProperties, outMessage, headers);
+ if (soapAction != null) {
+ messageProperties.setSOAPJMSSOAPAction(soapAction);
+ }
+ if (!messageProperties.isSetSOAPJMSIsFault()) {
+ boolean isFault = outMessage.getContent(Exception.class) != null;
+ messageProperties.setSOAPJMSIsFault(isFault);
+ }
+ if (!messageProperties.isSetSOAPJMSRequestURI()) {
+ messageProperties.setSOAPJMSRequestURI(requestURI);
+ }
+ } else {
+ if (MessageUtils.isRequestor(outMessage)) {
+ addJMSPropertiesFromMessage(messageProperties,
+ outMessage,
+ org.apache.cxf.message.Message.HTTP_REQUEST_METHOD,
+ org.apache.cxf.message.Message.REQUEST_URI,
+ org.apache.cxf.message.Message.ACCEPT_CONTENT_TYPE);
+ } else {
+ addJMSPropertyFromMessage(messageProperties,
+ outMessage,
+ org.apache.cxf.message.Message.RESPONSE_CODE);
+ }
+ addJMSPropertyFromMessage(messageProperties,
+ outMessage,
+ org.apache.cxf.message.Message.CONTENT_TYPE);
+ }
+ if (headers != null) {
+ for (Map.Entry<String, List<String>> ent : headers.entrySet()) {
+ JMSPropertyType prop = asJmsProperty(ent);
+ messageProperties.getProperty().add(prop);
+ }
+ }
+ }
+
+ private static JMSPropertyType asJmsProperty(Map.Entry<String, List<String>> ent) {
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(ent.getKey());
+ if (ent.getValue().size() > 1) {
+ StringBuilder b = new StringBuilder();
+ for (String s : ent.getValue()) {
+ if (b.length() > 0) {
+ b.append(',');
+ }
+ b.append(s);
+ }
+ prop.setValue(b.toString());
+ } else {
+ prop.setValue(ent.getValue().get(0));
+ }
+ return prop;
+ }
+
+ private static String getSoapAction(JMSMessageHeadersType messageProperties,
+ org.apache.cxf.message.Message outMessage,
+ Map<String, List<String>> headers) {
+ String soapAction = null;
+
+ if (headers != null) {
+ List<String> action = headers.remove("SOAPAction");
+ if (action != null && action.size() > 0) {
+ soapAction = action.get(0);
+ }
+ }
+
+ if (soapAction == null) {
+ soapAction = messageProperties.getSOAPJMSSOAPAction();
+ }
+
+ if (soapAction == null) {
+ soapAction = extractActionFromSoap12(outMessage);
+ }
+ return soapAction;
+ }
+
+ private static void addJMSPropertiesFromMessage(JMSMessageHeadersType messageProperties,
+ org.apache.cxf.message.Message message,
+ String... keys) {
+ for (String key : keys) {
+ addJMSPropertyFromMessage(messageProperties, message, key);
+ }
+
+ }
+
+ private static void addJMSPropertyFromMessage(JMSMessageHeadersType messageProperties,
+ org.apache.cxf.message.Message message,
+ String key) {
+ Object value = message.get(key);
+ if (value != null) {
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(key);
+ prop.setValue(value.toString());
+ messageProperties.getProperty().add(prop);
+ }
+ }
+
+ public static String getMessageType(final javax.jms.Message request) {
+ final String msgType;
+ if (request instanceof TextMessage) {
+ msgType = JMSConstants.TEXT_MESSAGE_TYPE;
+ } else if (request instanceof BytesMessage) {
+ msgType = JMSConstants.BYTE_MESSAGE_TYPE;
+ } else {
+ msgType = JMSConstants.BINARY_MESSAGE_TYPE;
+ }
+ return msgType;
+ }
+
+ private static String extractActionFromSoap12(org.apache.cxf.message.Message message) {
+ String ct = (String) message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+
+ if (ct == null) {
+ return null;
+ }
+
+ int start = ct.indexOf("action=");
+ if (start != -1) {
+ int end;
+ if (ct.charAt(start + 7) == '\"') {
+ start += 8;
+ end = ct.indexOf('\"', start);
+ } else {
+ start += 7;
+ end = ct.indexOf(';', start);
+ if (end == -1) {
+ end = ct.length();
+ }
+ }
+ return ct.substring(start, end);
+ }
+ return null;
+ }
+
+ public static boolean isMtomEnabled(final org.apache.cxf.message.Message message) {
+ return MessageUtils.isTrue(message.getContextualProperty(
+ org.apache.cxf.message.Message.MTOM_ENABLED));
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Wed Feb 5 21:58:23 2014
@@ -70,7 +70,7 @@ public class JMSTransportFactory extends
endpointInfo,
target,
true);
- return new JMSConduit(endpointInfo, target, jmsConf, bus);
+ return new JMSConduit(target, jmsConf, bus);
}
/**
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.io.Writer;
+
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+
+public final class MessageStreamUtil {
+
+ private MessageStreamUtil() {
+ }
+
+ /**
+ * Set Writer or OutputStream in message that calls the sender on close with
+ * the content of the stream
+ *
+ * @param message where to set the content
+ * @param isTextPayload decides about stream type true:Writer, false: OutputStream
+ * @param sender will be called on close
+ */
+ public static void prepareStream(final Message message, boolean isTextPayload,
+ final JMSExchangeSender sender) {
+ if (isTextPayload) {
+ message.setContent(Writer.class, new SendingWriter(sender, message.getExchange()));
+ } else {
+ SendingOutputStream out = new SendingOutputStream(sender, message.getExchange());
+ message.setContent(OutputStream.class, out);
+ }
+ }
+
+ private static final class SendingWriter extends StringWriter {
+ private final JMSExchangeSender sender;
+ private Exchange exchange;
+
+ private SendingWriter(JMSExchangeSender sender, Exchange exchange) {
+ this.sender = sender;
+ this.exchange = exchange;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ sender.sendExchange(exchange, toString());
+ }
+ }
+
+ private static final class SendingOutputStream extends CachedOutputStream {
+ private final JMSExchangeSender sender;
+ private Exchange exchange;
+
+ public SendingOutputStream(JMSExchangeSender sender, Exchange exchange) {
+ this.sender = sender;
+ this.exchange = exchange;
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ this.sender.sendExchange(exchange, getBytes());
+ }
+
+ }
+
+ public static void closeStreams(Message msg) throws IOException {
+ Writer writer = msg.getContent(Writer.class);
+ if (writer != null) {
+ writer.close();
+ }
+ Reader reader = msg.getContent(Reader.class);
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.io.Serializable;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Converts jms messages to Objects and vice a versa.
+ * <p>
+ * String <=> {@link javax.jms.TextMessage}
+ * byte[] <=> {@link javax.jms.BytesMessage}
+ * Serializable object <=> {@link javax.jms.ObjectMessage}
+ */
+public class JMSMessageConverter {
+
+ public Message toMessage(Object object, Session session) throws JMSException {
+ if (object instanceof Message) {
+ return (Message)object;
+ } else if (object instanceof String) {
+ return session.createTextMessage((String)object);
+ } else if (object instanceof byte[]) {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes((byte[])object);
+ return message;
+ } else if (object instanceof Serializable) {
+ return session.createObjectMessage((Serializable)object);
+ } else {
+ throw new IllegalArgumentException("Unsupported type " + nullSafeClassName(object) + "."
+ + " Valid types are: String, byte[], Serializable object.");
+ }
+ }
+
+ private String nullSafeClassName(Object object) {
+ return object == null ? "null" : object.getClass().getName();
+ }
+
+ public Object fromMessage(Message message) throws JMSException {
+ if (message instanceof TextMessage) {
+ return ((TextMessage)message).getText();
+ } else if (message instanceof BytesMessage) {
+ BytesMessage message1 = (BytesMessage)message;
+ byte[] bytes = new byte[(int)message1.getBodyLength()];
+ message1.readBytes(bytes);
+ return bytes;
+ } else if (message instanceof ObjectMessage) {
+ return ((ObjectMessage)message).getObject();
+ } else {
+ throw new IllegalArgumentException("Unsupported message type " + nullSafeClassName(message));
+ }
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class JMSSender {
+ private boolean explicitQosEnabled;
+ private int deliveryMode;
+ private int priority;
+ private long timeToLive;
+
+ public void setExplicitQosEnabled(boolean explicitQosEnabled) {
+ this.explicitQosEnabled = explicitQosEnabled;
+ }
+
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public void setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public void sendMessage(ResourceCloser closer, Session session, Destination targetDest,
+ javax.jms.Message message) throws JMSException {
+ MessageProducer producer = closer.register(session.createProducer(targetDest));
+ if (explicitQosEnabled) {
+ producer.send(message, deliveryMode, priority, timeToLive);
+ } else {
+ producer.send(message);
+ }
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+public final class JMSUtil {
+ private static final char[] CORRELATTION_ID_PADDING = {
+ '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'
+ };
+
+ private JMSUtil() {
+ }
+
+ public static Message receive(Session session,
+ Destination replyToDestination,
+ String correlationId,
+ long receiveTimeout,
+ boolean pubSubNoLocal) {
+ ResourceCloser closer = new ResourceCloser();
+ try {
+ final String messageSelector = "JMSCorrelationID = '" + correlationId + "'";
+ MessageConsumer consumer = closer.register(session.createConsumer(replyToDestination, messageSelector,
+ pubSubNoLocal));
+ javax.jms.Message replyMessage = consumer.receive(receiveTimeout);
+ if (replyMessage == null) {
+ throw new RuntimeException("Timeout receiving message with correlationId "
+ + correlationId);
+ }
+ return replyMessage;
+ } catch (JMSException e) {
+ throw convertJmsException(e);
+ } finally {
+ closer.close();
+ }
+ }
+
+ public static RuntimeException convertJmsException(JMSException e) {
+ return new RuntimeException(e.getMessage(), e);
+ }
+
+ public static String createCorrelationId(final String prefix, long i) {
+ String index = Long.toHexString(i);
+ StringBuilder id = new StringBuilder(prefix);
+ id.append(CORRELATTION_ID_PADDING, 0, 16 - index.length());
+ id.append(index);
+ return id.toString();
+ }
+
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.AbstractSequentialList;
+import java.util.LinkedList;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class ResourceCloser implements Closeable {
+ private AbstractSequentialList<Closeable> resources;
+
+ public ResourceCloser() {
+ resources = new LinkedList<Closeable>();
+ }
+
+ public <E extends Closeable> E register(E resource) {
+ resources.add(0, resource);
+ return resource;
+ }
+
+ public javax.jms.Connection register(final javax.jms.Connection connection) {
+ resources.add(0, new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ // Ignore
+ }
+ }
+ });
+ return connection;
+ }
+
+ public Session register(final Session session) {
+ resources.add(0, new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ // Ignore
+ }
+ }
+ });
+ return session;
+ }
+
+ public MessageConsumer register(final MessageConsumer consumer) {
+ resources.add(0, new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ // Ignore
+ }
+ }
+ });
+ return consumer;
+ }
+
+ public MessageProducer register(final MessageProducer producer) {
+ resources.add(0, new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ producer.close();
+ } catch (JMSException e) {
+ // Ignore
+ }
+ }
+ });
+ return producer;
+ }
+
+ @Override
+ public void close() {
+ for (Closeable resource : resources) {
+ try {
+ resource.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class SessionFactory {
+ private ConnectionFactory connectionFactory;
+ private ResourceCloser closer;
+ private boolean sessionTransacted;
+ private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ private String durableSubscriptionClientId;
+
+ public SessionFactory(ConnectionFactory connectionFactory, ResourceCloser closer) {
+ this.connectionFactory = connectionFactory;
+ this.closer = closer;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public void setSessionTransacted(boolean sessionTransacted) {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+ public void setAcknowledgeMode(int acknowledgeMode) {
+ this.acknowledgeMode = acknowledgeMode;
+ }
+
+ public void setDurableSubscriptionClientId(String durableSubscriptionClientId) {
+ this.durableSubscriptionClientId = durableSubscriptionClientId;
+ }
+
+ public Session createSession() throws JMSException {
+ Connection connection = closer.register(connectionFactory.createConnection());
+ if (durableSubscriptionClientId != null) {
+ connection.setClientID(durableSubscriptionClientId);
+ }
+ connection.start();
+ return closer.register(connection.createSession(sessionTransacted, acknowledgeMode));
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java Wed Feb 5 21:58:23 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.concurrent.Executors;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class TestReceiver {
+ private ConnectionFactory connectionFactory;
+ private String receiveQueueName;
+ private String requestMessageId;
+ private String staticReplyQueue;
+
+ public TestReceiver(ConnectionFactory connectionFactory, String receiveQueueName) {
+ this.connectionFactory = connectionFactory;
+ this.receiveQueueName = receiveQueueName;
+ assert this.connectionFactory != null;
+ assert this.receiveQueueName != null;
+ }
+
+ public String getRequestMessageId() {
+ return requestMessageId;
+ }
+
+ public void setStaticReplyQueue(String staticReplyQueue) {
+ this.staticReplyQueue = staticReplyQueue;
+ }
+
+ private void drainQueue() {
+ ResourceCloser closer = new ResourceCloser();
+ try {
+ Session session = new SessionFactory(connectionFactory, closer).createSession();
+ MessageConsumer consumer = closer.register(session.createConsumer(session.createQueue(receiveQueueName)));
+ javax.jms.Message message = null;
+ do {
+ message = consumer.receive(100);
+ } while (message != null);
+ } catch (JMSException e) {
+ throw JMSUtil.convertJmsException(e);
+ } finally {
+ closer.close();
+ }
+ }
+
+ private void receiveAndRespondWithMessageIdAsCorrelationId() {
+ ResourceCloser closer = new ResourceCloser();
+ try {
+ Session session = new SessionFactory(connectionFactory, closer).createSession();
+ MessageConsumer consumer = closer.register(session.createConsumer(session
+ .createQueue(receiveQueueName)));
+ final javax.jms.Message inMessage = consumer.receive();
+ requestMessageId = inMessage.getJMSMessageID();
+ System.out.println("Received message " + requestMessageId);
+ final TextMessage replyMessage = session.createTextMessage("Result");
+ replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
+ Destination replyDest = staticReplyQueue != null
+ ? session.createQueue(staticReplyQueue) : inMessage.getJMSReplyTo();
+ if (replyDest != null) {
+ final MessageProducer producer = closer
+ .register(session.createProducer(replyDest));
+ System.out.println("Sending reply to " + replyDest);
+ producer.send(replyMessage);
+ }
+ } catch (JMSException e) {
+ throw JMSUtil.convertJmsException(e);
+ } finally {
+ closer.close();
+ }
+ }
+
+ public void runAsync() {
+ drainQueue();
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ receiveAndRespondWithMessageIdAsCorrelationId();
+ }
+ });
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Wed Feb 5 21:58:23 2014
@@ -51,11 +51,9 @@ public abstract class AbstractJMSTester
private static JMSBrokerSetup broker;
protected Bus bus;
- protected EndpointInfo endpointInfo;
protected EndpointReferenceType target;
protected MessageObserver observer;
protected Message inMessage;
- protected String wsdlURL;
public static void startBroker(JMSBrokerSetup b) throws Exception {
assertNotNull(b);
@@ -82,26 +80,32 @@ public abstract class AbstractJMSTester
if (System.getProperty("cxf.config.file") != null) {
System.clearProperty("cxf.config.file");
}
- wsdlURL = null;
}
- protected void setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
+ protected EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
URL wsdlUrl = getClass().getResource(wsdl);
- wsdlURL = wsdlUrl.toString();
+ String wsdlURL = wsdlUrl.toString();
assertNotNull(wsdlUrl);
EmbeddedJMSBrokerLauncher.updateWsdlExtensors(bus, wsdlURL);
WSDLServiceFactory factory = new WSDLServiceFactory(bus, wsdlURL, new QName(ns, serviceName));
Service service = factory.create();
- endpointInfo = service.getEndpointInfo(new QName(ns, portName));
+ return service.getEndpointInfo(new QName(ns, portName));
}
-
- protected void sendoutMessage(Conduit conduit, Message message, Boolean isOneWay) throws IOException {
+
+ protected void sendoutMessage(Conduit conduit, Message message, boolean isOneWay) throws IOException {
+ sendoutMessage(conduit, message, isOneWay, true);
+ }
+
+ protected void sendoutMessage(Conduit conduit,
+ Message message,
+ boolean isOneWay,
+ boolean synchronous) throws IOException {
Exchange exchange = new ExchangeImpl();
exchange.setOneWay(isOneWay);
- exchange.setSynchronous(true);
+ exchange.setSynchronous(synchronous);
message.setExchange(exchange);
exchange.setOutMessage(message);
try {
@@ -122,7 +126,7 @@ public abstract class AbstractJMSTester
}
}
- protected void adjustEndpointInfoURL() {
+ protected void adjustEndpointInfoURL(EndpointInfo endpointInfo) {
if (endpointInfo != null) {
AddressType at = endpointInfo.getExtensor(AddressType.class);
if (at != null) {
@@ -138,21 +142,16 @@ public abstract class AbstractJMSTester
}
}
- protected JMSConduit setupJMSConduit(boolean send, boolean decoupled) throws IOException {
- if (decoupled) {
- // setup the reference type
- } else {
- target = EasyMock.createMock(EndpointReferenceType.class);
- }
-
- adjustEndpointInfoURL();
+ protected JMSConduit setupJMSConduit(EndpointInfo ei, boolean send) throws IOException {
+ target = EasyMock.createMock(EndpointReferenceType.class);
+ adjustEndpointInfoURL(ei);
JMSConfiguration jmsConfig = new JMSOldConfigHolder()
- .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, null, true);
+ .createJMSConfigurationFromEndpointInfo(bus, ei, null, true);
if (jmsConfig != null && jmsConfig.getReceiveTimeout() == null) {
jmsConfig.setReceiveTimeout(5000L);
}
- JMSConduit jmsConduit = new JMSConduit(endpointInfo, target, jmsConfig, bus);
+ JMSConduit jmsConduit = new JMSConduit(target, jmsConfig, bus);
if (send) {
// setMessageObserver
observer = new MessageObserver() {
@@ -165,5 +164,12 @@ public abstract class AbstractJMSTester
return jmsConduit;
}
+
+ protected JMSDestination setupJMSDestination(EndpointInfo ei) throws IOException {
+ adjustEndpointInfoURL(ei);
+ JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+ .createJMSConfigurationFromEndpointInfo(bus, ei, null, false);
+ return new JMSDestination(bus, ei, jmsConfig);
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java Wed Feb 5 21:58:23 2014
@@ -102,11 +102,4 @@ public class JMSBrokerSetup {
}
-
- /*class ContainerWapper extends BrokerContainerImpl {
-
- public void shutdown() {
- super.containerShutdown();
- }
- }*/
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Wed Feb 5 21:58:23 2014
@@ -31,17 +31,18 @@ import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.spring.SpringBusFactory;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.SessionCallback;
public class JMSConduitTest extends AbstractJMSTester {
@@ -59,9 +60,9 @@ public class JMSConduitTest extends Abst
BusFactory.setDefaultBus(null);
bus = bf.createBus("/jms_test_config.xml");
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
"HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
- JMSConduit conduit = setupJMSConduit(false, false);
+ JMSConduit conduit = setupJMSConduit(ei, false);
assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
.getReceiveTimeout().longValue());
bus.shutdown(false);
@@ -71,35 +72,17 @@ public class JMSConduitTest extends Abst
@Test
public void testPrepareSend() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
- JMSConduit conduit = setupJMSConduit(false, false);
+ JMSConduit conduit = setupJMSConduit(ei, false);
Message message = new MessageImpl();
- try {
- conduit.prepare(message);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- verifySentMessage(false, message);
- }
-
- private void verifySentMessage(boolean send, Message message) {
+ conduit.prepare(message);
OutputStream os = message.getContent(OutputStream.class);
Writer writer = message.getContent(Writer.class);
assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
}
- /*
- * @Test public void testSendOut() throws Exception {
- * setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
- * "HelloWorldServiceLoop", "HelloWorldPortLoop"); JMSConduit conduit = setupJMSConduit(true, false);
- * conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(10000)); try { for (int c = 0; c < 10; c++) {
- * LOG.info("Sending message " + c); inMessage = null; Message message = new MessageImpl();
- * sendoutMessage(conduit, message, false); verifyReceivedMessage(message); } } finally { conduit.close();
- * } }
- */
-
/**
* Sends several messages and verifies the results. The service sends the message to itself. So it should
* always receive the result
@@ -108,17 +91,17 @@ public class JMSConduitTest extends Abst
*/
@Test
public void testTimeoutOnReceive() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldServiceLoop", "HelloWorldPortLoop");
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
// TODO IF the system is extremely fast. The message could still get through
conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1));
Message message = new MessageImpl();
try {
sendoutMessage(conduit, message, false);
verifyReceivedMessage(message);
- throw new RuntimeException("Expected a timeout here");
+ fail("Expected a timeout here");
} catch (RuntimeException e) {
LOG.info("Received exception. This is expected");
} finally {
@@ -153,30 +136,22 @@ public class JMSConduitTest extends Abst
}
@Test
- public void testJMSMessageMarshal() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
- "HelloWorldServiceLoop", "HelloWorldPortLoop");
+ public void testJMSMessageMarshal() throws IOException, JMSException {
String testMsg = "Test Message";
- JMSConduit conduit = setupJMSConduit(true, false);
- Message msg = new MessageImpl();
- conduit.prepare(msg);
final byte[] testBytes = testMsg.getBytes(Charset.defaultCharset().name()); // TODO encoding
- JMSConfiguration jmsConfig = conduit.getJmsConfig();
- JmsTemplate jmsTemplate = new JmsTemplate();
- jmsTemplate.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
- SessionCallback<javax.jms.Message> sc = new SessionCallback<javax.jms.Message>() {
- public javax.jms.Message doInJms(Session session) throws JMSException {
- return JMSUtils.createAndSetPayload(testBytes, session, JMSConstants.BYTE_MESSAGE_TYPE);
- }
- };
- javax.jms.Message message = jmsTemplate.execute(sc);
+ JMSConfiguration jmsConfig = new JMSConfiguration();
+ jmsConfig.setConnectionFactory(new ActiveMQConnectionFactory("vm://tesstMarshal?broker.persistent=false"));
+
+ ResourceCloser closer = new ResourceCloser();
+ try {
+ Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+ javax.jms.Message jmsMessage =
+ JMSMessageUtils.createAndSetPayload(testBytes, session, JMSConstants.BYTE_MESSAGE_TYPE);
+ assertTrue("Message should have been of type BytesMessage ", jmsMessage instanceof BytesMessage);
+ } finally {
+ closer.close();
+ }
- // The ibm jdk finalizes conduit (during most runs of this test) and
- // causes it to fail unless we reference the conduit here after the
- // jmsTemplate.execute() call.
- assertNotNull("Conduit is null", conduit);
-
- assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage);
}
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Wed Feb 5 21:58:23 2014
@@ -27,6 +27,7 @@ import java.io.StringReader;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
@@ -39,10 +40,12 @@ import org.apache.cxf.message.ExchangeIm
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.security.SecurityContext;
+import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.MultiplexDestination;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class JMSDestinationTest extends AbstractJMSTester {
@@ -86,57 +89,34 @@ public class JMSDestinationTest extends
+ " seconds", destMessage != null);
}
- public JMSDestination setupJMSDestination(boolean send) throws IOException {
-
- adjustEndpointInfoURL();
- JMSConfiguration jmsConfig = new JMSOldConfigHolder()
- .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, null, false);
-
- JMSDestination jmsDestination = new JMSDestination(bus, endpointInfo, jmsConfig);
-
- if (send) {
- // setMessageObserver
- observer = new MessageObserver() {
- public void onMessage(Message m) {
- Exchange exchange = new ExchangeImpl();
- exchange.setInMessage(m);
- m.setExchange(exchange);
- destMessage = m;
- }
- };
- jmsDestination.setMessageObserver(observer);
- }
- return jmsDestination;
+ protected MessageObserver createMessageObserver() {
+ return new MessageObserver() {
+ public void onMessage(Message m) {
+ Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(m);
+ m.setExchange(exchange);
+ destMessage = m;
+ }
+ };
}
-
@Test
public void testGetConfigurationFromSpring() throws Exception {
SpringBusFactory bf = new SpringBusFactory();
BusFactory.setDefaultBus(null);
bus = bf.createBus("/jms_test_config.xml");
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
"HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
- JMSDestination destination = setupJMSDestination(false);
+ JMSDestination destination = setupJMSDestination(ei);
JMSConfiguration jmsConfig = destination.getJmsConfig();
- //JmsTemplate jmsTemplate = destination.getJmsTemplate();
- //AbstractMessageListenerContainer jmsListener = destination.getJmsListener();
assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, jmsConfig
.getTimeToLive());
assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", jmsConfig
.getMessageSelector());
- // assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10,
- // jmsListener.getLowWaterMark());
- // assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword",
- // .getConnectionPassword());
assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", jmsConfig
.getDurableSubscriptionName());
- /*setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
- "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
- destination = setupJMSDestination(false);
- jmsConfig = destination.getJmsConfig();*/
assertEquals("The receiveTimeout should be set", jmsConfig.getReceiveTimeout().longValue(), 1500L);
assertEquals("The concurrentConsumer should be set", jmsConfig.getConcurrentConsumers(), 3);
assertEquals("The maxConcurrentConsumer should be set", jmsConfig.getMaxConcurrentConsumers(), 5);
@@ -146,8 +126,8 @@ public class JMSDestinationTest extends
jmsConfig.isAcceptMessagesWhileStopping());
assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory());
assertTrue("Should get the instance of ActiveMQConnectionFactory",
- jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory);
- ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory)jmsConfig.getConnectionFactory();
+ jmsConfig.getPlainConnectionFactory() instanceof ActiveMQConnectionFactory);
+ ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory)jmsConfig.getPlainConnectionFactory();
assertEquals("The borker URL is wrong", cf.getBrokerURL(), "tcp://localhost:61500");
assertEquals("Get a wrong TargetDestination", jmsConfig.getTargetDestination(), "queue:test");
assertEquals("Get the wrong pubSubDomain value", jmsConfig.isPubSubDomain(), false);
@@ -162,10 +142,10 @@ public class JMSDestinationTest extends
BusFactory.setDefaultBus(null);
bus = bf.createBus();
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
- JMSDestination destination = setupJMSDestination(false);
+ JMSDestination destination = setupJMSDestination(ei);
assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination
.getJmsConfig().getDurableSubscriptionName());
@@ -185,12 +165,13 @@ public class JMSDestinationTest extends
bus = bf.createBus("jms_test_config.xml");
BusFactory.setDefaultBus(bus);
destMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldPubSubService", "HelloWorldPubSubPort");
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
- JMSDestination destination = setupJMSDestination(true);
+ JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
// The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber
Thread.sleep(2000);
sendoutMessage(conduit, outMessage, true);
@@ -209,11 +190,12 @@ public class JMSDestinationTest extends
@Test
public void testOneWayDestination() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
- JMSDestination destination = setupJMSDestination(true);
+ JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
@@ -229,22 +211,24 @@ public class JMSDestinationTest extends
}
@Test
+ @Ignore
public void testOneWayReplyToSetUnset() throws Exception {
/* 1. Test that replyTo destination set in WSDL is NOT used
* in spec compliant mode */
destMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
+ System.out.println(conduit.getJmsConfig().getReplyDestination());
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
- JMSDestination destination = setupJMSDestination(true);
+ JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
sendoutMessage(conduit, outMessage, true);
- // wait for the message to be get from the destination
waitForReceiveDestMessage();
// just verify the Destination inMessage
- assertTrue("The destiantion should have got the message ", destMessage != null);
+ assertTrue("The destination should have got the message ", destMessage != null);
verifyReplyToNotSet(destMessage);
destMessage = null;
@@ -254,10 +238,8 @@ public class JMSDestinationTest extends
conduit.getJmsConfig().setEnforceSpec(false);
sendoutMessage(conduit, outMessage, true);
waitForReceiveDestMessage();
- assertTrue("The destiantion should have got the message ", destMessage != null);
- String exName = conduit.getJmsConfig().getReplyDestination();
- exName = (exName.indexOf('/') != -1 && exName.indexOf('/') < exName.length())
- ? exName.substring(exName.indexOf('/') + 1) : exName;
+ assertTrue("The destination should have got the message ", destMessage != null);
+ String exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
destMessage = null;
@@ -293,9 +275,7 @@ public class JMSDestinationTest extends
sendoutMessage(conduit, outMessage, true);
waitForReceiveDestMessage();
assertTrue("The destiantion should have got the message ", destMessage != null);
- exName = conduit.getJmsConfig().getReplyDestination();
- exName = (exName.indexOf('/') != -1 && exName.indexOf('/') < exName.length())
- ? exName.substring(exName.indexOf('/') + 1) : exName;
+ exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
destMessage = null;
@@ -303,6 +283,14 @@ public class JMSDestinationTest extends
destination.shutdown();
}
+ private String getQueueName(String exName) {
+ if (exName == null) {
+ return null;
+ }
+ return (exName.indexOf('/') != -1 && exName.indexOf('/') < exName.length())
+ ? exName.substring(exName.indexOf('/') + 1) : exName;
+ }
+
protected void verifyReplyToNotSet(Message cxfMsg) {
javax.jms.Message jmsMsg =
@@ -310,23 +298,26 @@ public class JMSDestinationTest extends
assertNotNull("JMS Messsage must be null", jmsMsg);
}
+ private String getDestinationName(Destination dest) throws JMSException {
+ if (dest instanceof Queue) {
+ return ((Queue)dest).getQueueName();
+ } else {
+ return ((Topic)dest).getTopicName();
+ }
+ }
+
protected void verifyReplyToSet(Message cxfMsg,
Class<? extends Destination> type,
- String name) throws Exception {
+ String expectedName) throws Exception {
javax.jms.Message jmsMsg =
javax.jms.Message.class.cast(cxfMsg.get(JMSConstants.JMS_REQUEST_MESSAGE));
assertNotNull("JMS Messsage must not be null", jmsMsg);
assertNotNull("JMS Messsage's replyTo must not be null", jmsMsg.getJMSReplyTo());
assertTrue("JMS Messsage's replyTo type must be of type " + type.getName(),
type.isAssignableFrom(jmsMsg.getJMSReplyTo().getClass()));
- String receivedName = null;
- if (type == Queue.class) {
- receivedName = ((Queue)jmsMsg.getJMSReplyTo()).getQueueName();
- } else if (type == Topic.class) {
- receivedName = ((Topic)jmsMsg.getJMSReplyTo()).getTopicName();
- }
- assertTrue("JMS Messsage's replyTo must be named " + name + " but was " + receivedName,
- name == receivedName || receivedName.equals(name));
+ String receivedName = getDestinationName(jmsMsg.getJMSReplyTo());
+ assertTrue("JMS Messsage's replyTo must be named " + expectedName + " but was " + receivedName,
+ expectedName == receivedName || receivedName.equals(expectedName));
}
private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
@@ -417,14 +408,13 @@ public class JMSDestinationTest extends
@Test
public void testRoundTripDestination() throws Exception {
-
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
// set up the conduit send to be true
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage, null);
- final JMSDestination destination = setupJMSDestination(false);
+ final JMSDestination destination = setupJMSDestination(ei);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
@@ -473,10 +463,10 @@ public class JMSDestinationTest extends
final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
// set up the conduit send to be true
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage, null);
@@ -488,7 +478,7 @@ public class JMSDestinationTest extends
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
headers.getProperty().add(excludeProp);
- final JMSDestination destination = setupJMSDestination(false);
+ final JMSDestination destination = setupJMSDestination(ei);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
@@ -537,20 +527,22 @@ public class JMSDestinationTest extends
@Test
public void testIsMultiplexCapable() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
- final JMSDestination destination = setupJMSDestination(true);
+ final JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
assertTrue("is multiplex", destination instanceof MultiplexDestination);
destination.shutdown();
}
@Test
public void testSecurityContext() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldService", "HelloWorldPort");
- final JMSDestination destination = setupJMSDestination(true);
+ final JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
// set up the conduit send to be true
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage, null);
sendoutMessage(conduit, outMessage, true);
@@ -565,11 +557,12 @@ public class JMSDestinationTest extends
@Test
public void testGetSpringSingleConnectionFactoryFromWSDL() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
"HelloWorldServiceSpringICF", "HelloWorldPortSpringICF");
- final JMSDestination destination = setupJMSDestination(true);
+ final JMSDestination destination = setupJMSDestination(ei);
+ destination.setMessageObserver(createMessageObserver());
// set up the conduit send to be true
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(ei, true);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage, null);
sendoutMessage(conduit, outMessage, true);