You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/09/24 10:26:32 UTC
svn commit: r698460 - in /cxf/trunk/rt/transports/jms: ./
src/main/java/org/apache/cxf/transport/jms/
src/main/java/org/apache/cxf/transport/jms/spring/
src/test/java/org/apache/cxf/transport/jms/ src/test/resources/
Author: ningjiang
Date: Wed Sep 24 01:26:31 2008
New Revision: 698460
URL: http://svn.apache.org/viewvc?rev=698460&view=rev
Log:
CXF-1806 applied patch with thanks to Christian
Added:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (with props)
Modified:
cxf/trunk/rt/transports/jms/pom.xml
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
Modified: cxf/trunk/rt/transports/jms/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/pom.xml?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/pom.xml (original)
+++ cxf/trunk/rt/transports/jms/pom.xml Wed Sep 24 01:26:31 2008
@@ -91,6 +91,11 @@
<artifactId>activemq-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
</dependencies>
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Wed Sep 24 01:26:31 2008
@@ -28,20 +28,22 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.naming.Context;
+import javax.jms.QueueSession;
+import javax.jms.Session;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
-import org.apache.cxf.configuration.Configurer;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.support.destination.DestinationResolver;
/**
* JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport
@@ -55,21 +57,11 @@
static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
- protected Destination targetDestination;
- protected JMSSessionFactory sessionFactory;
protected Bus bus;
protected EndpointInfo endpointInfo;
+ protected JMSConfiguration jmsConfig;
protected String beanNameSuffix;
- protected ClientConfig clientConfig;
- protected ClientBehaviorPolicyType runtimePolicy;
- protected AddressType address;
- protected SessionPoolType sessionPool;
-
- private Queue replyDestination;
-
- private Context context;
-
public JMSConduit(Bus b, EndpointInfo endpointInfo) {
this(b, endpointInfo, null);
}
@@ -79,54 +71,38 @@
this.bus = b;
this.endpointInfo = endpointInfo;
this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
- initConfig();
- }
-
- private void initConfig() {
- this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
- this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
- this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
- this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
- ClientBehaviorPolicyType.class);
-
- Configurer configurer = bus.getExtension(Configurer.class);
- if (null != configurer) {
- configurer.configureBean(this);
- }
- }
-
- public JMSSessionFactory getOrCreateSessionFactory() {
- if (this.sessionFactory == null) {
- try {
- this.context = JMSUtils.getInitialContext(address);
- this.sessionFactory = JMSSessionFactory
- .connect(getJMSAddress(), getSessionPool(), null);
- this.targetDestination = JMSUtils.resolveRequestDestination(sessionFactory
- .getInitialContext(), sessionFactory.getConnection(), address);
- this.replyDestination = JMSUtils.resolveReplyDestination(context, sessionFactory
- .getConnection(), address);
- } catch (Exception jmsex) {
- throw new RuntimeException("JMS connect failed: ", jmsex);
- }
- }
- if (this.targetDestination == null) {
- throw new RuntimeException("Failed to lookup or create requestDestination");
- }
- return this.sessionFactory;
}
// prepare the message for send out , not actually send out the message
public void prepare(Message message) throws IOException {
- if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
- throw new RuntimeException("Insufficient configuration for Conduit. "
- + "Did you configure a <jms:conduit name=\""
- + getBeanName() + "\"> and set the jndiConnectionFactoryName ?");
- }
- message.setContent(OutputStream.class, new JMSOutputStream(this,
- message.getExchange(), isTextPayload()));
+ message.setContent(OutputStream.class, new JMSOutputStream(this, message.getExchange(),
+ isTextPayload()));
// After this step flow will continue in JMSOutputStream.doClose()
}
+ public Destination determineReplyToDestination(final JmsTemplate jmsTemplate,
+ final String replyToDestinationName,
+ final boolean pubSubDomain, boolean isOneWay) {
+ if (isOneWay) {
+ return null;
+ }
+ return (Destination)jmsTemplate.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ if (replyToDestinationName == null) {
+ if (session instanceof QueueSession) {
+ // For JMS 1.0.2
+ return ((QueueSession)session).createTemporaryQueue();
+ } else {
+ // For JMS 1.1
+ return session.createTemporaryQueue();
+ }
+ }
+ DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+ return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
+ }
+ });
+ }
+
/**
* Send the JMS Request out and if not oneWay receive the response
*
@@ -134,115 +110,61 @@
* @param request
* @return inMessage
*/
- public void sendExchange(Exchange exchange, Object request) {
+ public void sendExchange(final Exchange exchange, final Object request) {
LOG.log(Level.FINE, "JMSConduit send message");
-
- sessionFactory = getOrCreateSessionFactory();
- PooledSession pooledSession = null;
- try {
- pooledSession = sessionFactory.get();
- Destination replyTo = null;
- if (!exchange.isOneWay()) {
- pooledSession.initConsumerAndReplyDestination(replyDestination);
- replyTo = pooledSession.getReplyDestination();
- }
-
- // TODO setting up the responseExpected
-
- // We don't want to send temp queue in
- // replyTo header for oneway calls
- if (exchange.isOneWay() && (getJMSAddress().getJndiReplyDestinationName() == null)) {
- replyTo = null;
- }
- Message outMessage = exchange.getOutMessage();
- if (outMessage == null) {
- throw new RuntimeException("Exchange to be sent has no outMessage");
- }
- sendMessage(outMessage, request, pooledSession, replyTo);
-
- if (!exchange.isOneWay()) {
- long receiveTimeout = clientConfig.getClientReceiveTimeout();
- Long messageReceiveTimeout = (Long)exchange.getOutMessage()
- .get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
- if (messageReceiveTimeout != null) {
- receiveTimeout = messageReceiveTimeout.longValue();
- }
- Message inMessage = receiveResponse(pooledSession.consumer(), receiveTimeout);
- exchange.setInMessage(inMessage);
- incomingObserver.onMessage(inMessage);
- }
- } finally {
- sessionFactory.recycle(pooledSession);
- }
- }
-
- private void sendMessage(Message outMessage, Object request, PooledSession pooledSession,
- Destination replyTo) {
- try {
- String messageType = runtimePolicy.getMessageType().value();
- javax.jms.Message jmsMessage;
- jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
- pooledSession.session(), replyTo,
- pooledSession.getCorrelationID());
-
- // Retrieve JMS QoS parameters from CXF message headers
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- long ttl = JMSUtils.getTimeToLive(headers);
- if (ttl <= 0) {
- ttl = clientConfig.getMessageTimeToLive();
+ final JmsTemplate jmsTemplate = jmsConfig.getJmsTemplate();
+ final Destination replyTo = determineReplyToDestination(jmsTemplate,
+ jmsConfig.getReplyDestination(), jmsConfig
+ .isPubSubDomain(), exchange.isOneWay());
+ final Message outMessage = exchange.getOutMessage();
+ if (outMessage == null) {
+ throw new RuntimeException("Exchange to be sent has no outMessage");
+ }
+
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) ? headers
+ .getJMSCorrelationID() : JMSUtils.generateUniqueSelector();
+ String selector = "JMSCorrelationID = '" + correlationId + "'";
+
+ // TODO This is not thread safe
+ jmsTemplate.setPriority(JMSUtils.getJMSPriority(headers));
+ jmsTemplate.send(jmsConfig.getTargetDestination(), new MessageCreator() {
+ public javax.jms.Message createMessage(Session session) throws JMSException {
+ String messageType = jmsConfig.getMessageType();
+ final javax.jms.Message jmsMessage;
+ jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
+ session, replyTo, correlationId);
+ LOG.log(Level.FINE, "client sending request: ", jmsMessage);
+ return jmsMessage;
}
- int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
- int priority = JMSUtils.getJMSPriority(headers);
+ });
- LOG.log(Level.FINE, "client sending request: ", jmsMessage);
- JMSUtils.sendMessage(pooledSession.producer(), targetDestination, jmsMessage, ttl, deliveryMode,
- priority);
- } catch (JMSException e) {
- throw new RuntimeException("Problem while sending JMS message", e);
- }
- }
-
- private Message receiveResponse(MessageConsumer consumer, long receiveTimeout) {
- // TODO if outMessage need to get the response
- try {
- Message inMessage = new MessageImpl();
- // set the message header back to the incomeMessage
- // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
- // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
-
- byte[] response = null;
- javax.jms.Message jmsMessage = consumer.receive(receiveTimeout);
+ if (!exchange.isOneWay()) {
+ javax.jms.Message jmsMessage = jmsTemplate.receiveSelected(replyTo, selector);
if (jmsMessage == null) {
- // TODO: Review what exception should we throw.
- throw new JMSException("JMS receive timed out");
+ throw new RuntimeException("JMS receive timed out");
}
+ Message inMessage = new MessageImpl();
LOG.log(Level.FINE, "client received reply: ", jmsMessage);
- JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- response = JMSUtils.retrievePayload(jmsMessage);
+ JMSUtils
+ .populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+ byte[] response = JMSUtils.retrievePayload(jmsMessage);
LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
-
- // setup the inMessage response stream
inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
- LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
-
- return inMessage;
- } catch (JMSException e) {
- throw new RuntimeException("Problem while receiving JMS message", e);
+ exchange.setInMessage(inMessage);
+ if (incomingObserver != null) {
+ incomingObserver.onMessage(inMessage);
+ }
}
-
}
private boolean isTextPayload() {
- return JMSConstants.TEXT_MESSAGE_TYPE.equals(runtimePolicy.getMessageType().value());
+ return JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
}
public void close() {
- getLogger().log(Level.FINE, "JMSConduit closed ");
- // ensure resources held by session factory are released
- if (sessionFactory != null) {
- sessionFactory.shutdown();
- }
+ LOG.log(Level.FINE, "JMSConduit closed ");
}
protected Logger getLogger() {
@@ -253,35 +175,12 @@
return endpointInfo.getName().toString() + ".jms-conduit";
}
- public AddressType getJMSAddress() {
- return address;
- }
-
- public void setJMSAddress(AddressType a) {
- this.address = a;
+ public JMSConfiguration getJmsConfig() {
+ return jmsConfig;
}
- public ClientConfig getClientConfig() {
- return clientConfig;
+ public void setJmsConfig(JMSConfiguration jmsConfig) {
+ this.jmsConfig = jmsConfig;
}
- public void setClientConfig(ClientConfig clientConfig) {
- this.clientConfig = clientConfig;
- }
-
- public ClientBehaviorPolicyType getRuntimePolicy() {
- return runtimePolicy;
- }
-
- public void setRuntimePolicy(ClientBehaviorPolicyType runtimePolicy) {
- this.runtimePolicy = runtimePolicy;
- }
-
- public SessionPoolType getSessionPool() {
- return sessionPool;
- }
-
- public void setSessionPool(SessionPoolType sessionPool) {
- this.sessionPool = sessionPool;
- }
}
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=698460&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Wed Sep 24 01:26:31 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.AbstractJmsListeningContainer;
+
+public class JMSConfiguration implements InitializingBean {
+ private ConnectionFactory connectionFactory;
+ private JmsTemplate jmsTemplate;
+ private AbstractJmsListeningContainer jmsListener;
+ private String targetDestination;
+ private String replyDestination;
+ private String messageType;
+ private boolean pubSubDomain;
+
+ public JMSConfiguration() {
+ targetDestination = null;
+ replyDestination = null;
+ messageType = JMSConstants.TEXT_MESSAGE_TYPE;
+ pubSubDomain = false;
+ }
+
+ public void afterPropertiesSet() throws Exception {
+ /*
+ * if (connectionFactory == null) { throw new RuntimeException("Required property connectionfactory
+ * was not set"); } jmsTemplate.setConnectionFactory(connectionFactory);
+ * jmsListener.setConnectionFactory(connectionFactory);
+ */
+ }
+
+ public JmsTemplate getJmsTemplate() {
+ return jmsTemplate;
+ }
+
+ @Required
+ public void setJmsTemplate(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
+ }
+
+ public AbstractJmsListeningContainer getJmsListener() {
+ return jmsListener;
+ }
+
+ @Required
+ public void setJmsListener(AbstractJmsListeningContainer jmsListener) {
+ this.jmsListener = jmsListener;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ @Required
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public String getTargetDestination() {
+ return targetDestination;
+ }
+
+ public void setTargetDestination(String targetDestination) {
+ this.targetDestination = targetDestination;
+ }
+
+ public String getReplyDestination() {
+ return replyDestination;
+ }
+
+ public void setReplyDestination(String replyDestination) {
+ this.replyDestination = replyDestination;
+ }
+
+ public String getMessageType() {
+ return messageType;
+ }
+
+ public void setMessageType(String messageType) {
+ this.messageType = messageType;
+ }
+
+ public boolean isPubSubDomain() {
+ return pubSubDomain;
+ }
+
+ public void setPubSubDomain(boolean pubSubDomain) {
+ this.pubSubDomain = pubSubDomain;
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Sep 24 01:26:31 2008
@@ -124,8 +124,7 @@
Executor executor = SynchronousExecutor.getInstance();
if (wqm != null) {
if (name != null) {
- executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
- + name.getLocalPart());
+ executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + name.getLocalPart());
}
if (executor == null) {
executor = wqm.getNamedWorkQueue("jms");
@@ -199,19 +198,24 @@
}
}
- public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply)
- throws JMSException {
-
+ /**
+ * Decides what correlationId to use for the reply by looking at the request headers. If the request has a
+ * correlationId set this is taken. Else if the useMessageIDAsCorrelationID is true then the messageId
+ * from the request message is used as correlation Id
+ *
+ * @param request
+ * @return
+ * @throws JMSException
+ */
+ public String determineCorrelationID(javax.jms.Message request) throws JMSException {
String correlationID = request.getJMSCorrelationID();
-
- if (correlationID == null || "".equals(correlationID)
- && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
- correlationID = request.getJMSMessageID();
+ if ("".equals(correlationID)) {
+ correlationID = null;
}
-
- if (correlationID != null && !"".equals(correlationID)) {
- reply.setJMSCorrelationID(correlationID);
+ if (correlationID == null && getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
+ correlationID = request.getJMSMessageID();
}
+ return correlationID;
}
/**
@@ -241,8 +245,6 @@
// handle the incoming message
incomingObserver.onMessage(inMessage);
- } catch (JMSException e) {
- throw new RuntimeException("Error handling JMS message", e);
} finally {
BusFactory.setThreadDefaultBus(null);
}
@@ -276,12 +278,12 @@
javax.jms.Message reply = JMSUtils
.createAndSetPayload(replyObj, replySession.session(), msgType);
- setReplyCorrelationID(request, reply);
+ reply.setJMSCorrelationID(determineCorrelationID(request));
JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
JMSUtils.setMessageProperties(headers, reply);
// ensure that the contentType is set to the out jms message header
- JMSUtils.setContentToProtocolHeader(outMessage);
+ JMSUtils.addContentTypeToProtocolHeader(outMessage);
Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
.get(Message.PROTOCOL_HEADERS));
JMSUtils.addProtocolHeaders(reply, protHeaders);
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=698460&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Wed Sep 24 01:26:31 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.naming.NamingException;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.springframework.jms.connection.SingleConnectionFactory;
+import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.destination.JndiDestinationResolver;
+import org.springframework.jndi.JndiTemplate;
+
+public class JMSOldConfigHolder {
+ protected ClientConfig clientConfig;
+ protected ClientBehaviorPolicyType runtimePolicy;
+ protected AddressType address;
+ protected SessionPoolType sessionPool;
+
+ private ConnectionFactory getConnectionFactoryFromJndi(String connectionFactoryName, String userName,
+ String password, JndiTemplate jt) {
+ if (connectionFactoryName == null) {
+ return null;
+ }
+ try {
+
+ ConnectionFactory connectionFactory = (ConnectionFactory)jt.lookup(connectionFactoryName);
+ UserCredentialsConnectionFactoryAdapter uccf = new UserCredentialsConnectionFactoryAdapter();
+ uccf.setUsername(userName);
+ uccf.setPassword(password);
+ uccf.setTargetConnectionFactory(connectionFactory);
+
+ SingleConnectionFactory scf = new SingleConnectionFactory();
+ scf.setTargetConnectionFactory(uccf);
+ return scf;
+ } catch (NamingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo) {
+ JMSConfiguration jmsConf = new JMSConfiguration();
+
+ // Retrieve configuration information that was extracted from the wsdl
+ address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+ clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
+ runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+ ClientBehaviorPolicyType.class);
+
+ // Try to retrieve configuration information from the spring
+ // config. Search for a tag <jms:conduit> with name=endpoint name + ".jms-conduit"
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ String name = endpointInfo.getName().toString() + ".jms-conduit";
+ configurer.configureBean(name, this);
+ }
+
+ JndiTemplate jt = new JndiTemplate();
+ jt.setEnvironment(JMSUtils.getInitialContextEnv(address));
+ ConnectionFactory cf = getConnectionFactoryFromJndi(address.getJndiConnectionFactoryName(), address
+ .getConnectionUserName(), address.getConnectionPassword(), jt);
+
+ // TODO Use JmsTemplate102 in case JMS 1.1 is not available
+ JmsTemplate jmsTemplate = new JmsTemplate();
+ jmsTemplate.setConnectionFactory(cf);
+ boolean pubSubDomain = false;
+ if (address.isSetDestinationStyle()) {
+ pubSubDomain = DestinationStyleType.TOPIC == address.getDestinationStyle();
+ }
+ jmsTemplate.setPubSubDomain(pubSubDomain);
+ jmsTemplate.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
+ jmsTemplate.setTimeToLive(clientConfig.getMessageTimeToLive());
+ jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
+ jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+ jmsTemplate.setExplicitQosEnabled(true);
+
+ if (address.isSetJndiDestinationName()) {
+ // Setup Destination jndi destination resolver
+ final JndiDestinationResolver jndiDestinationResolver = new JndiDestinationResolver();
+ jndiDestinationResolver.setJndiTemplate(jt);
+ jmsTemplate.setDestinationResolver(jndiDestinationResolver);
+ jmsConf.setTargetDestination(address.getJndiDestinationName());
+ jmsConf.setReplyDestination(address.getJndiReplyDestinationName());
+ } else {
+ // Use the default dynamic destination resolver
+ jmsConf.setTargetDestination(address.getJmsDestinationName());
+ jmsConf.setReplyDestination(address.getJmsReplyDestinationName());
+ }
+ if (runtimePolicy.isSetMessageType()) {
+ jmsConf.setMessageType(runtimePolicy.getMessageType().value());
+ }
+
+ jmsConf.setJmsTemplate(jmsTemplate);
+ return jmsConf;
+ }
+
+ public ClientConfig getClientConfig() {
+ return clientConfig;
+ }
+
+ public void setClientConfig(ClientConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ }
+
+ public ClientBehaviorPolicyType getRuntimePolicy() {
+ return runtimePolicy;
+ }
+
+ public void setRuntimePolicy(ClientBehaviorPolicyType runtimePolicy) {
+ this.runtimePolicy = runtimePolicy;
+ }
+
+ public AddressType getAddress() {
+ return address;
+ }
+
+ public void setAddress(AddressType address) {
+ this.address = address;
+ }
+
+ public SessionPoolType getSessionPool() {
+ return sessionPool;
+ }
+
+ public void setSessionPool(SessionPoolType sessionPool) {
+ this.sessionPool = sessionPool;
+ }
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Wed Sep 24 01:26:31 2008
@@ -19,6 +19,7 @@
package org.apache.cxf.transport.jms;
+import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,6 +33,7 @@
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
+import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.cxf.common.logging.LogUtils;
@@ -152,7 +154,8 @@
ServerConfig serverConfig) throws JMSException,
NamingException {
- Context context = JMSUtils.getInitialContext(addrDetails);
+ Properties env = JMSUtils.getInitialContextEnv(addrDetails);
+ Context context = new InitialContext(env);
ConnectionFactory connectionFactory;
Connection connection = null;
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Wed Sep 24 01:26:31 2008
@@ -35,8 +35,8 @@
import org.apache.cxf.transport.DestinationFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-public class JMSTransportFactory extends AbstractTransportFactory
- implements ConduitInitiator, DestinationFactory {
+public class JMSTransportFactory extends AbstractTransportFactory implements ConduitInitiator,
+ DestinationFactory {
private static final Set<String> URI_PREFIXES = new HashSet<String>();
static {
@@ -44,6 +44,7 @@
}
private Bus bus;
+ private JMSConfiguration jmsConfig;
@Resource(name = "cxf")
public void setBus(Bus b) {
@@ -59,8 +60,11 @@
}
public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws IOException {
- JMSConduit conduit =
- target == null ? new JMSConduit(bus, endpointInfo) : new JMSConduit(bus, endpointInfo, target);
+ JMSConduit conduit = target == null
+ ? new JMSConduit(bus, endpointInfo) : new JMSConduit(bus, endpointInfo, target);
+ JMSOldConfigHolder old = new JMSOldConfigHolder();
+ JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
+ conduit.setJmsConfig(jmsConf);
return conduit;
}
@@ -72,8 +76,16 @@
}
return destination;
}
-
+
public Set<String> getUriPrefixes() {
return URI_PREFIXES;
}
+
+ public JMSConfiguration getJmsConfig() {
+ return jmsConfig;
+ }
+
+ public void setJmsConfig(JMSConfiguration jmsConfig) {
+ this.jmsConfig = jmsConfig;
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Wed Sep 24 01:26:31 2008
@@ -19,7 +19,10 @@
package org.apache.cxf.transport.jms;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -39,15 +42,16 @@
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.naming.Context;
-import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
+import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.converter.MessageConversionException;
+import org.springframework.jms.support.converter.SimpleMessageConverter102;
public final class JMSUtils {
@@ -57,7 +61,7 @@
}
- public static Context getInitialContext(AddressType addrType) throws NamingException {
+ public static Properties getInitialContextEnv(AddressType addrType) {
Properties env = new Properties();
populateContextEnvironment(addrType, env);
@@ -71,9 +75,7 @@
}
}
- Context context = new InitialContext(env);
-
- return context;
+ return env;
}
protected static void populateContextEnvironment(AddressType addrType, Properties env) {
@@ -114,14 +116,6 @@
return ttl;
}
- public static String getCorrelationId(JMSMessageHeadersType headers) {
- String correlationId = null;
- if (headers != null && headers.isSetJMSCorrelationID()) {
- correlationId = headers.getJMSCorrelationID();
- }
- return correlationId;
- }
-
public static void setMessageProperties(JMSMessageHeadersType headers, Message message)
throws JMSException {
@@ -165,62 +159,71 @@
* @param message the incoming message
* @return the message payload as byte[]
*/
- public static byte[] retrievePayload(Message message) throws JMSException {
- byte[] ret = null;
-
- if (message instanceof TextMessage) {
- ret = ((TextMessage)message).getText().getBytes();
- } else if (message instanceof BytesMessage) {
- ret = new byte[(int)((BytesMessage)message).getBodyLength()];
- ((BytesMessage)message).readBytes(ret);
+ public static byte[] retrievePayload(Message message) {
+ Object converted;
+ try {
+ converted = new SimpleMessageConverter102().fromMessage(message);
+ } catch (MessageConversionException e) {
+ throw new RuntimeException("Conversion failed", e);
+ } catch (JMSException e) {
+ throw JmsUtils.convertJmsAccessException(e);
+ }
+ if (converted instanceof String) {
+ return ((String)converted).getBytes(); // TODO encoding
+ } else if (converted instanceof byte[]) {
+ return (byte[])converted;
} else {
- ret = (byte[])((ObjectMessage)message).getObject();
+ return (byte[])converted; // TODO is this correct?
}
- return ret;
}
public static JMSMessageHeadersType populateIncomingContext(javax.jms.Message message,
org.apache.cxf.message.Message inMessage,
- String headerType) throws JMSException {
- JMSMessageHeadersType headers = null;
-
- headers = (JMSMessageHeadersType)inMessage.get(headerType);
+ String headerType) {
+ try {
+ JMSMessageHeadersType headers = null;
+ headers = (JMSMessageHeadersType)inMessage.get(headerType);
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ inMessage.put(headerType, headers);
+ }
+ headers.setJMSCorrelationID(message.getJMSCorrelationID());
+ headers.setJMSDeliveryMode(new Integer(message.getJMSDeliveryMode()));
+ headers.setJMSExpiration(new Long(message.getJMSExpiration()));
+ headers.setJMSMessageID(message.getJMSMessageID());
+ headers.setJMSPriority(new Integer(message.getJMSPriority()));
+ headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
+ headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
+ headers.setJMSType(message.getJMSType());
- if (headers == null) {
- headers = new JMSMessageHeadersType();
- inMessage.put(headerType, headers);
- }
+ Map<String, List<String>> protHeaders = new HashMap<String, List<String>>();
+ List<JMSPropertyType> props = headers.getProperty();
+ Enumeration enm = message.getPropertyNames();
+ while (enm.hasMoreElements()) {
+ String name = (String)enm.nextElement();
+ String val = message.getStringProperty(name);
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(name);
+ prop.setValue(val);
+ props.add(prop);
+
+ protHeaders.put(name, Collections.singletonList(val));
+ if (name.equals(org.apache.cxf.message.Message.CONTENT_TYPE)
+ || name.equals(JMSConstants.JMS_CONTENT_TYPE) && val != null) {
+ inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, val);
+ }
- headers.setJMSCorrelationID(message.getJMSCorrelationID());
- headers.setJMSDeliveryMode(new Integer(message.getJMSDeliveryMode()));
- headers.setJMSExpiration(new Long(message.getJMSExpiration()));
- headers.setJMSMessageID(message.getJMSMessageID());
- headers.setJMSPriority(new Integer(message.getJMSPriority()));
- headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
- headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
- headers.setJMSType(message.getJMSType());
-
- Map<String, List<String>> protHeaders = new HashMap<String, List<String>>();
- List<JMSPropertyType> props = headers.getProperty();
- Enumeration enm = message.getPropertyNames();
- while (enm.hasMoreElements()) {
- String name = (String)enm.nextElement();
- String val = message.getStringProperty(name);
- JMSPropertyType prop = new JMSPropertyType();
- prop.setName(name);
- prop.setValue(val);
- props.add(prop);
-
- protHeaders.put(name, Collections.singletonList(val));
- if (name.equals(org.apache.cxf.message.Message.CONTENT_TYPE)
- || name.equals(JMSConstants.JMS_CONTENT_TYPE)
- && val != null) {
- inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, val);
+ protHeaders.put(name, Collections.singletonList(val));
+ if (name.equals(org.apache.cxf.message.Message.CONTENT_TYPE)
+ || name.equals(JMSConstants.JMS_CONTENT_TYPE) && val != null) {
+ inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, val);
+ }
}
-
+ inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
+ return headers;
+ } catch (JMSException ex) {
+ throw JmsUtils.convertJmsAccessException(ex);
}
- inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
- return headers;
}
protected static void addProtocolHeaders(Message message, Map<String, List<String>> headers)
@@ -239,13 +242,13 @@
value.append(s);
first = false;
}
- //Incase if the Content-Type header key is Content-Type replace with JMS_Content_Type
+ // Incase if the Content-Type header key is Content-Type replace with JMS_Content_Type
if (entry.getKey().equals(org.apache.cxf.message.Message.CONTENT_TYPE)) {
message.setStringProperty(JMSConstants.JMS_CONTENT_TYPE, value.toString());
} else {
- message.setStringProperty(entry.getKey(), value.toString());
+ message.setStringProperty(entry.getKey(), value.toString());
}
-
+
}
}
@@ -259,7 +262,7 @@
return headers;
}
- public static void setContentToProtocolHeader(org.apache.cxf.message.Message message) {
+ public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) {
String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
@@ -272,7 +275,7 @@
ct = new ArrayList<String>();
headers.put(JMSConstants.JMS_CONTENT_TYPE, ct);
}
-
+
ct.add(contentType);
}
@@ -293,33 +296,19 @@
JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- String correlationID = JMSUtils.getCorrelationId(headers);
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ // throw new RuntimeException("No JMS_CLIENT_REQUEST_HEADERS set in message");
+ }
JMSUtils.setMessageProperties(headers, jmsMessage);
// ensure that the contentType is set to the out jms message header
- JMSUtils.setContentToProtocolHeader(outMessage);
+ JMSUtils.addContentTypeToProtocolHeader(outMessage);
Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
.get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
- if (!outMessage.getExchange().isOneWay()) {
- String id = correlationId;
-
- if (id != null) {
- if (correlationID != null) {
- String error = "User cannot set JMSCorrelationID when "
- + "making a request/reply invocation using " + "a static replyTo Queue.";
- throw new JMSException(error);
- }
- correlationID = id;
- }
- }
-
- if (correlationID != null) {
- jmsMessage.setJMSCorrelationID(correlationID);
- } else {
- // No message correlation id is set. Whatever comeback will be accepted as responses.
- // We assume that it will only happen in case of the temp. reply queue.
- }
+ jmsMessage.setJMSCorrelationID(correlationId);
+ jmsMessage.setJMSPriority(JMSUtils.getJMSPriority(headers));
return jmsMessage;
}
@@ -364,7 +353,7 @@
}
public static Queue resolveReplyDestination(Context context, Connection connection,
- AddressType addrDetails) throws NamingException,
+ AddressType addrDetails) throws NamingException,
JMSException {
Queue replyDestination = null;
@@ -382,4 +371,18 @@
}
return replyDestination;
}
+
+ public static String generateUniqueSelector() {
+ String host = "localhost";
+
+ try {
+ InetAddress addr = InetAddress.getLocalHost();
+ host = addr.getHostName();
+ } catch (UnknownHostException ukex) {
+ // Default to localhost.
+ }
+
+ long time = Calendar.getInstance().getTimeInMillis();
+ return host + "_" + System.getProperty("user.name") + "_" + Thread.currentThread() + time;
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java Wed Sep 24 01:26:31 2008
@@ -19,10 +19,6 @@
package org.apache.cxf.transport.jms;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Calendar;
-
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -83,19 +79,7 @@
return theProducer;
}
- private String generateUniqueSelector() {
- String host = "localhost";
-
- try {
- InetAddress addr = InetAddress.getLocalHost();
- host = addr.getHostName();
- } catch (UnknownHostException ukex) {
- // Default to localhost.
- }
- long time = Calendar.getInstance().getTimeInMillis();
- return host + "_" + System.getProperty("user.name") + "_" + this + time;
- }
MessageConsumer consumer() {
return theConsumer;
@@ -113,7 +97,7 @@
String selector = null;
if (null != destination) {
replyDestination = destination;
- selector = "JMSCorrelationID = '" + generateUniqueSelector() + "'";
+ selector = "JMSCorrelationID = '" + JMSUtils.generateUniqueSelector() + "'";
} else {
replyDestination = theSession.createTemporaryQueue();
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java Wed Sep 24 01:26:31 2008
@@ -42,7 +42,7 @@
ClientConfig.class);
mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "runtimePolicy"), "runtimePolicy",
ClientBehaviorPolicyType.class);
- mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "JMSAddress",
+ mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "address"), "address",
AddressType.class);
mapElementToJaxbProperty(element, bean, new QName(JMS_NS, "sessionPool"), "sessionPool",
SessionPoolType.class);
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Wed Sep 24 01:26:31 2008
@@ -43,32 +43,32 @@
public abstract class AbstractJMSTester extends Assert {
private static JMSBrokerSetup broker;
-
+
protected Bus bus;
protected EndpointInfo endpointInfo;
protected EndpointReferenceType target;
protected MessageObserver observer;
protected Message inMessage;
-
+
public static void startBroker(JMSBrokerSetup b) throws Exception {
assertNotNull(b);
broker = b;
broker.start();
}
-
- @AfterClass
+
+ @AfterClass
public static void stopBroker() throws Exception {
broker.stop();
broker = null;
}
-
+
@Before
public void setUp() {
BusFactory bf = BusFactory.newInstance();
bus = bf.createBus();
BusFactory.setDefaultBus(bus);
}
-
+
@After
public void tearDown() {
bus.shutdown(true);
@@ -76,19 +76,19 @@
System.clearProperty("cxf.config.file");
}
}
-
- protected void setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
+
+ protected void setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
URL wsdlUrl = getClass().getResource(wsdl);
assertNotNull(wsdlUrl);
WSDLServiceFactory factory = new WSDLServiceFactory(bus, wsdlUrl, new QName(ns, serviceName));
- Service service = factory.create();
+ Service service = factory.create();
endpointInfo = service.getEndpointInfo(new QName(ns, portName));
-
+
}
-
+
protected void sendoutMessage(Conduit conduit, Message message, Boolean isOneWay) throws IOException {
-
+
Exchange exchange = new ExchangeImpl();
exchange.setOneWay(isOneWay);
message.setExchange(exchange);
@@ -97,37 +97,36 @@
conduit.prepare(message);
} catch (IOException ex) {
assertFalse("JMSConduit can't perpare to send out message", false);
- ex.printStackTrace();
+ ex.printStackTrace();
}
OutputStream os = message.getContent(OutputStream.class);
assertTrue("The OutputStream should not be null ", os != null);
- os.write("HelloWorld".getBytes());
- os.close();
+ os.write("HelloWorld".getBytes()); // TODO encoding
+ os.close();
}
-
+
protected JMSConduit setupJMSConduit(boolean send, boolean decoupled) {
if (decoupled) {
// setup the reference type
} else {
target = EasyMock.createMock(EndpointReferenceType.class);
- }
-
+ }
+
JMSConduit jmsConduit = new JMSConduit(bus, endpointInfo, target);
-
+ JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+ .createJMSConfigurationFromEndpointInfo(bus, endpointInfo);
+ jmsConduit.setJmsConfig(jmsConfig);
if (send) {
// setMessageObserver
observer = new MessageObserver() {
- public void onMessage(Message m) {
+ public void onMessage(Message m) {
inMessage = m;
}
};
jmsConduit.setMessageObserver(observer);
}
-
- return jmsConduit;
+
+ return jmsConduit;
}
-
-
-
-
+
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Wed Sep 24 01:26:31 2008
@@ -19,13 +19,14 @@
package org.apache.cxf.transport.jms;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Session;
import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.spring.SpringBusFactory;
@@ -34,16 +35,16 @@
import org.apache.cxf.message.MessageImpl;
import org.junit.BeforeClass;
import org.junit.Test;
-
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.SessionCallback;
public class JMSConduitTest extends AbstractJMSTester {
-
-
+
@BeforeClass
public static void createAndStartBroker() throws Exception {
startBroker(new JMSBrokerSetup("tcp://localhost:61500"));
}
-
+
@Test
public void testGetConfiguration() throws Exception {
// setup the new bus to get the configuration file
@@ -51,64 +52,50 @@
BusFactory.setDefaultBus(null);
bus = bf.createBus("/jms_test_config.xml");
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/jms_conf_test",
- "/wsdl/others/jms_test_no_addr.wsdl",
- "HelloWorldQueueBinMsgService",
- "HelloWorldQueueBinMsgPort");
+ setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
+ "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSConduit conduit = setupJMSConduit(false, false);
- assertEquals("Can't get the right ClientReceiveTimeout",
- 500L,
- conduit.getClientConfig().getClientReceiveTimeout());
- assertEquals("Can't get the right SessionPoolConfig's LowWaterMark",
- 10,
- conduit.getSessionPool().getLowWaterMark());
- assertEquals("Can't get the right AddressPolicy's ConnectionPassword",
- "testPassword",
- conduit.getJMSAddress().getConnectionPassword());
+ assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
+ .getJmsTemplate().getReceiveTimeout());
bus.shutdown(false);
BusFactory.setDefaultBus(null);
-
+
}
-
+
@Test
public void testPrepareSend() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldService",
- "HelloWorldPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldService", "HelloWorldPort");
JMSConduit conduit = setupJMSConduit(false, false);
Message message = new MessageImpl();
try {
conduit.prepare(message);
} catch (Exception ex) {
- ex.printStackTrace();
+ ex.printStackTrace();
}
- verifySentMessage(false, message);
+ verifySentMessage(false, message);
}
-
+
public void verifySentMessage(boolean send, Message message) {
OutputStream os = message.getContent(OutputStream.class);
- assertTrue("OutputStream should not be null", os != null);
+ assertTrue("OutputStream should not be null", os != null);
}
-
+
@Test
public void testSendOut() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldServiceLoop",
- "HelloWorldPortLoop");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldServiceLoop", "HelloWorldPortLoop");
- JMSConduit conduit = setupJMSConduit(true, false);
+ JMSConduit conduit = setupJMSConduit(true, false);
Message message = new MessageImpl();
// set the isOneWay to false
- sendoutMessage(conduit, message, false);
- verifyReceivedMessage(message);
+ sendoutMessage(conduit, message, false);
+ verifyReceivedMessage(message);
}
-
- public void verifyReceivedMessage(Message message) {
- ByteArrayInputStream bis =
- (ByteArrayInputStream) inMessage.getContent(InputStream.class);
+
+ public void verifyReceivedMessage(Message message) {
+ ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
byte bytes[] = new byte[bis.available()];
try {
bis.read(bytes);
@@ -117,38 +104,38 @@
}
String reponse = IOUtils.newStringFromBytes(bytes);
assertEquals("The reponse date should be equals", reponse, "HelloWorld");
-
- JMSMessageHeadersType inHeader =
- (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-
+
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+
assertTrue("The inMessage JMS Header should not be null", inHeader != null);
-
-
+
}
-
+
@Test
public void testJMSMessageMarshal() throws Exception {
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldServiceLoop",
- "HelloWorldPortLoop");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldServiceLoop", "HelloWorldPortLoop");
- String testMsg = "Test Message";
- JMSConduit conduit = setupJMSConduit(true, false);
+ String testMsg = "Test Message";
+ JMSConduit conduit = setupJMSConduit(true, false);
Message msg = new MessageImpl();
conduit.prepare(msg);
- PooledSession sess = conduit.getOrCreateSessionFactory().get();
- byte [] b = testMsg.getBytes();
- javax.jms.Message message = JMSUtils.createAndSetPayload(b,
- sess.session(),
- JMSConstants.BYTE_MESSAGE_TYPE);
-
- assertTrue("Message should have been of type BytesMessage ",
- message instanceof BytesMessage);
-// byte[] returnBytes = new byte[(int)((BytesMessage) message).getBodyLength()];
-// ((BytesMessage) message).readBytes(returnBytes);
-// assertTrue("Message marshalled was incorrect",
-// testMsg.equals(new String(returnBytes)));
+ final byte[] b = testMsg.getBytes(); // TODO encoding
+ JmsTemplate jmsTemplate = conduit.getJmsConfig().getJmsTemplate();
+ javax.jms.Message message = (javax.jms.Message)jmsTemplate.execute(new SessionCallback() {
+
+ public Object doInJms(Session session) throws JMSException {
+ return JMSUtils.createAndSetPayload(b, session, JMSConstants.BYTE_MESSAGE_TYPE);
+ }
+
+ });
+
+ assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage);
+ // byte[] returnBytes = new byte[(int)((BytesMessage) message).getBodyLength()];
+ // ((BytesMessage) message).readBytes(returnBytes);
+ // assertTrue("Message marshalled was incorrect",
+ // testMsg.equals(new String(returnBytes)));
}
-
+
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Wed Sep 24 01:26:31 2008
@@ -39,6 +39,7 @@
import org.junit.Test;
public class JMSDestinationTest extends AbstractJMSTester {
+ private static final int MAX_RECEIVE_TIME = 3;
private Message destMessage;
@BeforeClass
@@ -48,15 +49,16 @@
private void waitForReceiveInMessage() {
int waitTime = 0;
- while (inMessage == null && waitTime < 3000) {
+ while (inMessage == null && waitTime < MAX_RECEIVE_TIME) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// do nothing here
}
- waitTime = waitTime + 1000;
+ waitTime++;
}
- assertTrue("Can't receive the Conduit Message in 3 seconds", inMessage != null);
+ assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
+ inMessage != null);
}
private void waitForReceiveDestMessage() {
@@ -72,8 +74,6 @@
assertTrue("Can't receive the Destination message in 3 seconds", destMessage != null);
}
-
-
public JMSDestination setupJMSDestination(boolean send) throws IOException {
ConduitInitiator conduitInitiator = EasyMock.createMock(ConduitInitiator.class);
JMSDestination jmsDestination = new JMSDestination(bus, conduitInitiator, endpointInfo);
@@ -98,29 +98,21 @@
BusFactory.setDefaultBus(null);
bus = bf.createBus("/jms_test_config.xml");
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/jms_conf_test",
- "/wsdl/others/jms_test_no_addr.wsdl",
- "HelloWorldQueueBinMsgService",
- "HelloWorldQueueBinMsgPort");
+ setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
+ "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSDestination destination = setupJMSDestination(false);
- assertEquals("Can't get the right ServerConfig's MessageTimeToLive ",
- 500L,
- destination.getServerConfig().getMessageTimeToLive());
- assertEquals("Can't get the right Server's MessageSelector",
- "cxf_message_selector",
- destination.getRuntimePolicy().getMessageSelector());
- assertEquals("Can't get the right SessionPoolConfig's LowWaterMark",
- 10,
- destination.getSessionPool().getLowWaterMark());
- assertEquals("Can't get the right AddressPolicy's ConnectionPassword",
- "testPassword",
- destination.getJMSAddress().getConnectionPassword());
- assertEquals("Can't get the right DurableSubscriberName",
- "cxf_subscriber",
- destination.getRuntimePolicy().getDurableSubscriberName());
- assertEquals("Can't get the right MessageSelectorName",
- "cxf_message_selector",
- destination.getRuntimePolicy().getMessageSelector());
+ assertEquals("Can't get the right ServerConfig's MessageTimeToLive ", 500L, destination
+ .getServerConfig().getMessageTimeToLive());
+ assertEquals("Can't get the right Server's MessageSelector", "cxf_message_selector", destination
+ .getRuntimePolicy().getMessageSelector());
+ assertEquals("Can't get the right SessionPoolConfig's LowWaterMark", 10, destination
+ .getSessionPool().getLowWaterMark());
+ assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "testPassword", destination
+ .getJMSAddress().getConnectionPassword());
+ assertEquals("Can't get the right DurableSubscriberName", "cxf_subscriber", destination
+ .getRuntimePolicy().getDurableSubscriberName());
+ assertEquals("Can't get the right MessageSelectorName", "cxf_message_selector", destination
+ .getRuntimePolicy().getMessageSelector());
BusFactory.setDefaultBus(null);
}
@@ -131,20 +123,17 @@
BusFactory.setDefaultBus(null);
bus = bf.createBus();
BusFactory.setDefaultBus(bus);
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldQueueBinMsgService",
- "HelloWorldQueueBinMsgPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSDestination destination = setupJMSDestination(false);
- assertEquals("Can't get the right DurableSubscriberName",
- "CXF_subscriber",
- destination.getRuntimePolicy().getDurableSubscriberName());
+ assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination
+ .getRuntimePolicy().getDurableSubscriberName());
assertEquals("Can't get the right AddressPolicy's ConnectionPassword",
- "dynamicQueues/test.jmstransport.binary",
- destination.getJMSAddress().getJndiDestinationName());
+ "dynamicQueues/test.jmstransport.binary", destination.getJMSAddress()
+ .getJndiDestinationName());
BusFactory.setDefaultBus(null);
@@ -154,25 +143,17 @@
public void testDurableSubscriber() throws Exception {
SpringBusFactory bf = new SpringBusFactory();
BusFactory.setDefaultBus(null);
- bus = bf.createBus("/wsdl/jms_test_config.xml");
+ bus = bf.createBus("jms_test_config.xml");
BusFactory.setDefaultBus(bus);
destMessage = null;
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldPubSubService",
- "HelloWorldPubSubPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduit(true, false);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
- JMSDestination destination = null;
- try {
- destination = setupJMSDestination(true);
- destination.activate();
- } catch (IOException e) {
- assertFalse("The JMSDestination activate should not through exception ", false);
- e.printStackTrace();
- }
+ JMSDestination destination = setupJMSDestination(true);
+ // destination.activate();
sendoutMessage(conduit, outMessage, true);
// wait for the message to be get from the destination
waitForReceiveDestMessage();
@@ -187,10 +168,8 @@
public void testOneWayDestination() throws Exception {
destMessage = null;
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HWStaticReplyQBinMsgService",
- "HWStaticReplyQBinMsgPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
JMSConduit conduit = setupJMSConduit(true, false);
Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
@@ -222,8 +201,7 @@
}
private void verifyReceivedMessage(Message inMessage) {
- ByteArrayInputStream bis =
- (ByteArrayInputStream) inMessage.getContent(InputStream.class);
+ ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
byte bytes[] = new byte[bis.available()];
try {
bis.read(bytes);
@@ -236,54 +214,49 @@
}
private void verifyRequestResponseHeaders(Message inMessage, Message outMessage) {
- JMSMessageHeadersType outHeader =
- (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- JMSMessageHeadersType inHeader =
- (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
}
private void verifyHeaders(Message inMessage, Message outMessage) {
- JMSMessageHeadersType outHeader =
- (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- JMSMessageHeadersType inHeader =
- (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
}
private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
- assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals",
- outHeader.getJMSCorrelationID(), inHeader.getJMSCorrelationID());
- assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals",
- outHeader.getJMSPriority(), inHeader.getJMSPriority());
- assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals",
- outHeader.getJMSType(), inHeader.getJMSType());
-
+ assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", outHeader
+ .getJMSCorrelationID(), inHeader.getJMSCorrelationID());
+ assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
+ .getJMSPriority(), inHeader.getJMSPriority());
+ assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
+ .getJMSType(), inHeader.getJMSType());
}
-
-
@Test
public void testRoundTripDestination() throws Exception {
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldService",
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", "HelloWorldService",
"HelloWorldPort");
- //set up the conduit send to be true
+ // set up the conduit send to be true
JMSConduit conduit = setupJMSConduit(true, false);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
final JMSDestination destination = setupJMSDestination(true);
- //set up MessageObserver for handling the conduit message
+ // set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
Exchange exchange = new ExchangeImpl();
@@ -291,23 +264,22 @@
m.setExchange(exchange);
verifyReceivedMessage(m);
verifyHeaders(m, outMessage);
- //setup the message for
+ // setup the message for
Conduit backConduit;
try {
backConduit = destination.getBackChannel(m, null, null);
- //wait for the message to be got from the conduit
+ // wait for the message to be got from the conduit
Message replyMessage = new MessageImpl();
sendoutMessage(backConduit, replyMessage, true);
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
};
destination.setMessageObserver(observer);
- //set is oneway false for get response from destination
+ // set is oneway false for get response from destination
sendoutMessage(conduit, outMessage, false);
- //wait for the message to be got from the destination,
+ // wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
waitForReceiveInMessage();
@@ -331,11 +303,9 @@
"THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldService",
- "HelloWorldPort");
- //set up the conduit send to be true
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldService", "HelloWorldPort");
+ // set up the conduit send to be true
JMSConduit conduit = setupJMSConduit(true, false);
final Message outMessage = new MessageImpl();
setupMessageHeader(outMessage);
@@ -344,14 +314,13 @@
excludeProp.setName(customPropertyName);
excludeProp.setValue(customPropertyName);
- JMSMessageHeadersType headers = (JMSMessageHeadersType)
- outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
headers.getProperty().add(excludeProp);
-
final JMSDestination destination = setupJMSDestination(true);
- //set up MessageObserver for handling the conduit message
+ // set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
Exchange exchange = new ExchangeImpl();
@@ -359,38 +328,36 @@
m.setExchange(exchange);
verifyReceivedMessage(m);
verifyHeaders(m, outMessage);
- //setup the message for
+ // setup the message for
Conduit backConduit;
try {
backConduit = destination.getBackChannel(m, null, null);
- //wait for the message to be got from the conduit
+ // wait for the message to be got from the conduit
Message replyMessage = new MessageImpl();
sendoutMessage(backConduit, replyMessage, true);
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
};
destination.setMessageObserver(observer);
- //set is oneway false for get response from destination
+ // set is oneway false for get response from destination
sendoutMessage(conduit, outMessage, false);
- //wait for the message to be got from the destination,
+ // wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
-
verifyRequestResponseHeaders(inMessage, outMessage);
- JMSMessageHeadersType inHeader =
- (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- assertTrue("property has been excluded, only CONTENT_TYPE should be here",
- inHeader.getProperty().size() == 1);
+ assertTrue("property has been excluded, only CONTENT_TYPE should be here", inHeader.getProperty()
+ .size() == 1);
assertTrue("property has been excluded, only " + JMSConstants.JMS_CONTENT_TYPE + "should be here",
- inHeader.getProperty().get(0).getName().equals(JMSConstants.JMS_CONTENT_TYPE));
+ inHeader.getProperty().get(0).getName().equals(JMSConstants.JMS_CONTENT_TYPE));
// wait for a while for the jms session recycling
Thread.sleep(1000);
destination.shutdown();
@@ -399,10 +366,8 @@
@Test
public void testIsMultiplexCapable() throws Exception {
inMessage = null;
- setupServiceInfo("http://cxf.apache.org/hello_world_jms",
- "/wsdl/jms_test.wsdl",
- "HelloWorldService",
- "HelloWorldPort");
+ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
+ "HelloWorldService", "HelloWorldPort");
final JMSDestination destination = setupJMSDestination(true);
assertTrue("is multiplex", destination instanceof MultiplexDestination);
}
Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=698460&r1=698459&r2=698460&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml Wed Sep 24 01:26:31 2008
@@ -33,13 +33,13 @@
<jms:sessionPool lowWaterMark="10" highWaterMark="5000"/>
<jms:address
destinationStyle="queue"
- jndiConnectionFactoryName="MockConnectionFactory"
+ jndiConnectionFactoryName="ConnectionFactory"
jndiDestinationName="myOwnDestination"
jndiReplyDestinationName="myOwnReplyDestination"
connectionUserName="testUser"
connectionPassword="testPassword">
- <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.cxf.transport.jms.MockInitialContextFactory"/>
- <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61616"/>
+ <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
+ <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
</jms:address>
</jms:conduit>
@@ -63,7 +63,7 @@
connectionUserName="testUser"
connectionPassword="testPassword">
<jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.cxf.transport.jms.MockInitialContextFactory"/>
- <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61616"/>
+ <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
</jms:address>
</jms:destination>
@@ -77,5 +77,33 @@
lowWaterMark="10"
highWaterMark="5000"/>
</jms:destination>
+
+ <!--
+ <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+ </bean>
+ <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
+ p:connectionFactory-ref="connectionFactory"
+ p:pubSubDomain="false"
+ p:receiveTimeout="1000"
+ />
+
+ <bean id="jmsConf1" class="org.apache.cxf.transport.jms.JMSConfiguration"
+ p:jmsTemplate-ref="jmsTemplate"
+ p:targetDestination="queue:test"
+ p:pubSubDomain="false"
+ />
+ <bean class="org.apache.cxf.transport.jms.JMSTransportFactory"
+ id="org.apache.cxf.transport.jms.JMSTransportFactory"
+ lazy-init="true">
+ <property name="bus" ref="cxf"/>
+ <property name="transportIds">
+ <list>
+ <value>http://cxf.apache.org/transports/jms</value>
+ <value>http://cxf.apache.org/transports/jms/configuration</value>
+ </list>
+ </property>
+ <property name="jmsConfig" ref="jmsConf1"/>
+ </bean>
+ -->
</beans>