You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/09/05 09:31:38 UTC
svn commit: r692365 [1/2] - in /cxf/branches/2.0.x-fixes: ./
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/
Author: ningjiang
Date: Fri Sep 5 00:31:37 2008
New Revision: 692365
URL: http://svn.apache.org/viewvc?rev=692365&view=rev
Log:
Merged revisions 690841 via svnmerge from
https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes
................
r690841 | ningjiang | 2008-09-01 11:22:08 +0800 (Mon, 01 Sep 2008) | 9 lines
Merged revisions 690638 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r690638 | ningjiang | 2008-08-31 12:49:48 +0800 (Sun, 31 Aug 2008) | 1 line
CXF-1773 applied patch with thanks to Christian
........
................
Added:
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java
- copied unchanged from r690841, cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java
Removed:
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportBase.java
Modified:
cxf/branches/2.0.x-fixes/ (props changed)
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Sep 5 00:31:37 2008
@@ -1 +1 @@
-/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715
+/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,690841,691316,691357,691491,691711,691715
Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=692365&r1=692364&r2=692365&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Fri Sep 5 00:31:37 2008
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.jms;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -29,6 +28,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
@@ -48,104 +48,106 @@
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-public class JMSConduit extends AbstractConduit implements Configurable, JMSTransport {
+public class JMSConduit extends AbstractConduit implements Configurable, JMSOnConnectCallback {
protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
-
- protected final JMSTransportBase base;
+
+ protected Destination targetDestination;
+ protected Destination replyDestination;
+ protected JMSSessionFactory sessionFactory;
+ protected Bus bus;
+ protected EndpointInfo endpointInfo;
+ protected String beanNameSuffix;
+
protected ClientConfig clientConfig;
protected ClientBehaviorPolicyType runtimePolicy;
protected AddressType address;
protected SessionPoolType sessionPool;
-
+
public JMSConduit(Bus b, EndpointInfo endpointInfo) {
this(b, endpointInfo, null);
}
-
- public JMSConduit(Bus b,
- EndpointInfo endpointInfo,
- EndpointReferenceType target) {
- super(target);
- base = new JMSTransportBase(b, endpointInfo, false, BASE_BEAN_NAME_SUFFIX, this);
-
+ public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
+ super(target);
+
+ this.bus = b;
+ this.endpointInfo = endpointInfo;
+ this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
initConfig();
- }
-
+ }
+
// prepare the message for send out , not actually send out the message
- public void prepare(Message message) throws IOException {
+ public void prepare(Message message) throws IOException {
getLogger().log(Level.FINE, "JMSConduit send message");
try {
- if (null == base.sessionFactory) {
- JMSProviderHub.connect(this);
+ if (null == sessionFactory) {
+ JMSProviderHub.connect(this, getJMSAddress(), getSessionPool());
}
} catch (JMSException jmsex) {
- getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
+ getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
throw new IOException(jmsex.toString());
} catch (NamingException ne) {
getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
throw new IOException(ne.toString());
}
- if (base.sessionFactory == null) {
+ if (sessionFactory == null) {
throw new java.lang.IllegalStateException("JMSClientTransport not connected");
}
try {
- boolean isOneWay = false;
- //test if the message is oneway message
+ boolean isOneWay = false;
+ // test if the message is oneway message
Exchange ex = message.getExchange();
if (null != ex) {
isOneWay = ex.isOneWay();
- }
- //get the pooledSession with response expected
- PooledSession pooledSession = base.sessionFactory.get(!isOneWay);
+ }
+ // get the pooledSession with response expected
+ PooledSession pooledSession = sessionFactory.get(!isOneWay);
// put the PooledSession into the outMessage
message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
-
+
} catch (JMSException jmsex) {
throw new IOException(jmsex.getMessage());
}
-
- message.setContent(OutputStream.class,
- new JMSOutputStream(message));
-
+
+ message.setContent(OutputStream.class, new JMSOutputStream(message));
+
}
- public void close() {
+ public void close() {
getLogger().log(Level.FINE, "JMSConduit closed ");
// ensure resources held by session factory are released
//
- if (base.sessionFactory != null) {
- base.sessionFactory.shutdown();
+ if (sessionFactory != null) {
+ sessionFactory.shutdown();
}
}
-
+
protected Logger getLogger() {
return LOG;
}
-
/**
* Receive mechanics.
- *
+ *
* @param pooledSession the shared JMS resources
- * @param inMessage
+ * @param inMessage
* @retrun the response buffer
*/
- private Object receive(PooledSession pooledSession,
- Message outMessage, Message inMessage) throws JMSException {
-
+ private Object receive(PooledSession pooledSession, Message outMessage, Message inMessage)
+ throws JMSException {
+
Object result = null;
-
+
long timeout = getClientConfig().getClientReceiveTimeout();
Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
@@ -153,56 +155,52 @@
if (receiveTimeout != null) {
timeout = receiveTimeout.longValue();
}
-
+
javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout);
- getLogger().log(Level.FINE, "client received reply: " , jmsMessage);
+ getLogger().log(Level.FINE, "client received reply: ", jmsMessage);
if (jmsMessage != null) {
-
- base.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- result = base.unmarshal(jmsMessage);
+
+ JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+ result = JMSUtils.unmarshal(jmsMessage);
return result;
} else {
String error = "JMSClientTransport.receive() timed out. No message available.";
getLogger().log(Level.SEVERE, error);
- //TODO: Review what exception should we throw.
+ // TODO: Review what exception should we throw.
throw new JMSException(error);
-
+
}
}
- public void connected(javax.jms.Destination target,
- javax.jms.Destination reply,
- JMSSessionFactory factory) {
- base.connected(target, reply, factory);
+ public void connected(Destination target, Destination reply, JMSSessionFactory factory) {
+ this.targetDestination = target;
+ this.replyDestination = reply;
+ this.sessionFactory = factory;
}
public String getBeanName() {
- return base.endpointInfo.getName().toString() + ".jms-conduit";
+ return endpointInfo.getName().toString() + ".jms-conduit";
}
-
+
private void initConfig() {
- this.address = base.endpointInfo.getTraversedExtensor(new AddressType(),
- AddressType.class);
- this.sessionPool = base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
- SessionPoolType.class);
- this.clientConfig = base.endpointInfo.getTraversedExtensor(new ClientConfig(),
- ClientConfig.class);
- this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
- ClientBehaviorPolicyType.class);
+ this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+ this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+ this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
+ this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+ ClientBehaviorPolicyType.class);
- Configurer configurer = base.bus.getExtension(Configurer.class);
+ Configurer configurer = bus.getExtension(Configurer.class);
if (null != configurer) {
configurer.configureBean(this);
}
}
private boolean isTextPayload() {
- return JMSConstants.TEXT_MESSAGE_TYPE.equals(
- getRuntimePolicy().getMessageType().value());
+ return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy().getMessageType().value());
}
-
+
public AddressType getJMSAddress() {
return address;
}
@@ -235,23 +233,22 @@
this.sessionPool = sessionPool;
}
-
private class JMSOutputStream extends CachedOutputStream {
private Message outMessage;
private javax.jms.Message jmsMessage;
private PooledSession pooledSession;
private boolean isOneWay;
-
+
public JMSOutputStream(Message m) {
outMessage = m;
pooledSession = (PooledSession)outMessage.get(JMSConstants.JMS_POOLEDSESSION);
- }
-
+ }
+
protected void doFlush() throws IOException {
- //do nothing here
+ // do nothing here
}
-
- protected void doClose() throws IOException {
+
+ protected void doClose() throws IOException {
try {
isOneWay = outMessage.getExchange().isOneWay();
commitOutputMessage();
@@ -259,30 +256,28 @@
handleResponse();
}
} catch (JMSException jmsex) {
- getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
+ getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
throw new IOException(jmsex.toString());
} finally {
- base.sessionFactory.recycle(pooledSession);
+ sessionFactory.recycle(pooledSession);
}
}
-
+
protected void onWrite() throws IOException {
-
+
}
-
+
private void commitOutputMessage() throws JMSException {
javax.jms.Destination replyTo = pooledSession.destination();
- //TODO setting up the responseExpected
-
-
- //We don't want to send temp queue in
- //replyTo header for oneway calls
- if (isOneWay
- && (getJMSAddress().getJndiReplyDestinationName() == null)) {
+ // TODO setting up the responseExpected
+
+ // We don't want to send temp queue in
+ // replyTo header for oneway calls
+ if (isOneWay && (getJMSAddress().getJndiReplyDestinationName() == null)) {
replyTo = null;
}
- Object request = null;
+ Object request = null;
try {
if (isTextPayload()) {
StringBuilder builder = new StringBuilder(2048);
@@ -299,37 +294,36 @@
if (getLogger().isLoggable(Level.FINE)) {
getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
}
-
-
- jmsMessage = base.marshal(request, pooledSession.session(), replyTo,
- getRuntimePolicy().getMessageType().value());
-
- JMSMessageHeadersType headers =
- (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
- int deliveryMode = base.getJMSDeliveryMode(headers);
- int priority = base.getJMSPriority(headers);
- String correlationID = base.getCorrelationId(headers);
- long ttl = base.getTimeToLive(headers);
+
+ jmsMessage = JMSUtils.marshal(request, pooledSession.session(), replyTo, getRuntimePolicy()
+ .getMessageType().value());
+
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+ int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+ int priority = JMSUtils.getJMSPriority(headers);
+ String correlationID = JMSUtils.getCorrelationId(headers);
+ long ttl = JMSUtils.getTimeToLive(headers);
if (ttl <= 0) {
ttl = getClientConfig().getMessageTimeToLive();
}
-
- base.setMessageProperties(headers, jmsMessage);
-
- //ensure that the contentType is set to the out jms message header
- base.setContentToProtocalHeader(outMessage);
- Map<String, List<String>> protHeaders =
- CastUtils.cast((Map<?, ?>)outMessage.get(Message.PROTOCOL_HEADERS));
- base.addProtocolHeaders(jmsMessage, protHeaders);
+
+ JMSUtils.setMessageProperties(headers, jmsMessage);
+ // ensure that the contentType is set to the out jms message header
+ JMSUtils.setContentToProtocalHeader(outMessage);
+ Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+ .get(Message.PROTOCOL_HEADERS));
+ JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
+
if (!isOneWay) {
String id = pooledSession.getCorrelationID();
if (id != null) {
if (correlationID != null) {
String error = "User cannot set JMSCorrelationID when "
- + "making a request/reply invocation using "
- + "a static replyTo Queue.";
+ + "making a request/reply invocation using "
+ + "a static replyTo Queue.";
throw new JMSException(error);
}
correlationID = id;
@@ -339,48 +333,47 @@
if (correlationID != null) {
jmsMessage.setJMSCorrelationID(correlationID);
} else {
- //No message correlation id is set. Whatever comeback will be accepted as responses.
+ // No message correlation id is set. Whatever comeback will be accepted as responses.
// We assume that it will only happen in case of the temp. reply queue.
}
- getLogger().log(Level.FINE, "client sending request: ", jmsMessage);
- //getting Destination Style
- if (base.isDestinationStyleQueue()) {
+ getLogger().log(Level.FINE, "client sending request: ", jmsMessage);
+ // getting Destination Style
+ if (JMSUtils.isDestinationStyleQueue(address)) {
QueueSender sender = (QueueSender)pooledSession.producer();
sender.setTimeToLive(ttl);
- sender.send((Queue)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
+ sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl);
} else {
TopicPublisher publisher = (TopicPublisher)pooledSession.producer();
publisher.setTimeToLive(ttl);
- publisher.publish((Topic)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
+ publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl);
}
}
private void handleResponse() throws IOException {
// REVISIT distinguish decoupled case or oneway call
Object response = null;
-
- //TODO if outMessage need to get the response
+
+ // TODO if outMessage need to get the response
Message inMessage = new MessageImpl();
- outMessage.getExchange().setInMessage(inMessage);
- //set the message header back to the incomeMessage
- //inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
- // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
-
+ outMessage.getExchange().setInMessage(inMessage);
+ // set the message header back to the incomeMessage
+ // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
+ // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
+
try {
response = receive(pooledSession, outMessage, inMessage);
} catch (JMSException jmsex) {
- getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);
+ getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);
throw new IOException(jmsex.toString());
- }
-
+ }
getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
-
+
// setup the inMessage response stream
byte[] bytes = null;
if (response instanceof String) {
- String requestString = (String)response;
+ String requestString = (String)response;
bytes = requestString.getBytes();
} else {
bytes = (byte[])response;
@@ -392,16 +385,14 @@
}
-
/**
* Represented decoupled response endpoint.
*/
protected class DecoupledDestination implements Destination {
protected MessageObserver decoupledMessageObserver;
private EndpointReferenceType address;
-
- DecoupledDestination(EndpointReferenceType ref,
- MessageObserver incomingObserver) {
+
+ DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
address = ref;
decoupledMessageObserver = incomingObserver;
}
@@ -410,25 +401,23 @@
return address;
}
- public Conduit getBackChannel(Message inMessage,
- Message partialResponse,
- EndpointReferenceType addr)
+ public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr)
throws IOException {
// shouldn't be called on decoupled endpoint
return null;
}
public void shutdown() {
- // TODO Auto-generated method stub
+ // TODO Auto-generated method stub
}
public synchronized void setMessageObserver(MessageObserver observer) {
decoupledMessageObserver = observer;
}
-
+
public synchronized MessageObserver getMessageObserver() {
return decoupledMessageObserver;
}
- }
+ }
}
Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=692365&r1=692364&r2=692365&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Sep 5 00:31:37 2008
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.jms;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -36,6 +35,7 @@
import java.util.logging.Logger;
import javax.jms.BytesMessage;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
@@ -62,32 +62,36 @@
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
+public class JMSDestination extends AbstractMultiplexDestination implements Configurable,
+ JMSOnConnectCallback {
-
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable, JMSTransport {
-
protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
-
+
protected ServerConfig serverConfig;
protected ServerBehaviorPolicyType runtimePolicy;
protected AddressType address;
protected SessionPoolType sessionPool;
-
+ protected Destination targetDestination;
+ protected Destination replyDestination;
+ protected JMSSessionFactory sessionFactory;
+ protected Bus bus;
+ protected EndpointInfo endpointInfo;
+ protected String beanNameSuffix;
+
final ConduitInitiator conduitInitiator;
- final JMSTransportBase base;
-
+
+
PooledSession listenerSession;
JMSListenerThread listenerThread;
-
- public JMSDestination(Bus b,
- ConduitInitiator ci,
- EndpointInfo info) throws IOException {
- super(b, getTargetReference(info, b), info);
-
- base = new JMSTransportBase(b, endpointInfo, true, BASE_BEAN_NAME_SUFFIX, this);
+ public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
+ super(b, getTargetReference(info, b), info);
+
+ this.bus = b;
+ this.endpointInfo = info;
+ this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
conduitInitiator = ci;
initConfig();
@@ -96,24 +100,23 @@
protected Logger getLogger() {
return LOG;
}
-
+
/**
* @param inMessage the incoming message
* @return the inbuilt backchannel
*/
protected Conduit getInbuiltBackChannel(Message inMessage) {
- return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
- inMessage);
+ return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
}
-
- public void activate() {
- getLogger().log(Level.INFO, "JMSServerTransport activate().... ");
+
+ public void activate() {
+ getLogger().log(Level.INFO, "JMSServerTransport activate().... ");
try {
getLogger().log(Level.FINE, "establishing JMS connection");
- JMSProviderHub.connect(this, serverConfig, runtimePolicy);
+ JMSProviderHub.connect(this, getJMSAddress(), getSessionPool(), serverConfig, runtimePolicy);
//Get a non-pooled session.
- listenerSession = base.sessionFactory.get(base.targetDestination);
+ listenerSession = sessionFactory.get(targetDestination);
listenerThread = new JMSListenerThread(listenerSession,
endpointInfo.getName());
listenerThread.start();
@@ -123,18 +126,18 @@
getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex);
}
}
-
- public void deactivate() {
+
+ public void deactivate() {
try {
listenerSession.consumer().close();
if (listenerThread != null) {
listenerThread.join();
}
- base.sessionFactory.shutdown();
+ sessionFactory.shutdown();
} catch (InterruptedException e) {
- //Do nothing here
+ // Do nothing here
} catch (JMSException ex) {
- //Do nothing here
+ // Do nothing here
}
}
@@ -143,43 +146,40 @@
this.deactivate();
}
- public Queue getReplyToDestination(Message inMessage)
- throws JMSException, NamingException {
+ public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
Queue replyTo;
- javax.jms.Message message =
- (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+ javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
// If WS-Addressing had set the replyTo header.
- if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
- replyTo = base.sessionFactory.getQueueFromInitialContext(
- (String) inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO));
+ if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
+ replyTo = sessionFactory.getQueueFromInitialContext((String)inMessage
+ .get(JMSConstants.JMS_REBASED_REPLY_TO));
} else {
- replyTo = (null != message.getJMSReplyTo())
- ? (Queue)message.getJMSReplyTo() : (Queue)base.replyDestination;
- }
+ replyTo = (null != message.getJMSReplyTo())
+ ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination;
+ }
return replyTo;
}
-
- public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply)
- throws JMSException {
-
+
+ public void setReplyCorrelationID(javax.jms.Message request,
+ javax.jms.Message reply) throws JMSException {
+
String correlationID = request.getJMSCorrelationID();
-
- if (correlationID == null
- || "".equals(correlationID)
+
+ if (correlationID == null || "".equals(correlationID)
&& getRuntimePolicy().isUseMessageIDAsCorrelationID()) {
correlationID = request.getJMSMessageID();
}
-
+
if (correlationID != null && !"".equals(correlationID)) {
reply.setJMSCorrelationID(correlationID);
}
}
-
+
protected void incoming(javax.jms.Message message) throws IOException {
try {
getLogger().log(Level.FINE, "server received request: ", message);
-
- Object request = base.unmarshal(message);
+
+ Object request = JMSUtils.unmarshal(message);
getLogger().log(Level.FINE, "The Request Message is [ " + request + "]");
byte[] bytes = null;
@@ -188,50 +188,52 @@
getLogger().log(Level.FINE, "server received request: ", requestString);
bytes = requestString.getBytes();
} else {
- //Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
+ // Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
bytes = (byte[])request;
}
// get the message to be interceptor
MessageImpl inMessage = new MessageImpl();
inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
- base.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+ JMSUtils.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
-
- inMessage.setDestination(this);
-
+
+ inMessage.setDestination(this);
+
BusFactory.setThreadDefaultBus(bus);
-
- //handle the incoming message
+
+ // handle the incoming message
incomingObserver.onMessage(inMessage);
-
+
} catch (JMSException jmsex) {
- //TODO: need to revisit for which exception should we throw.
+ // TODO: need to revisit for which exception should we throw.
throw new IOException(jmsex.getMessage());
} finally {
BusFactory.setThreadDefaultBus(null);
}
}
-
+
public void connected(javax.jms.Destination target,
javax.jms.Destination reply,
JMSSessionFactory factory) {
- base.connected(target, reply, factory);
+ this.targetDestination = target;
+ this.replyDestination = reply;
+ this.sessionFactory = factory;
}
public String getBeanName() {
return endpointInfo.getName().toString() + ".jms-destination";
}
-
+
private void initConfig() {
this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
ServerBehaviorPolicyType.class);
this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
-
- Configurer configurer = base.bus.getExtension(Configurer.class);
+
+ Configurer configurer = bus.getExtension(Configurer.class);
if (null != configurer) {
configurer.configureBean(this);
}
@@ -268,10 +270,11 @@
public void setSessionPool(SessionPoolType sessionPool) {
this.sessionPool = sessionPool;
}
-
+
protected class JMSListenerThread extends Thread {
private final PooledSession listenSession;
private final QName name;
+
public JMSListenerThread(PooledSession session, QName n) {
listenSession = session;
name = n;
@@ -281,49 +284,47 @@
try {
Executor executor = null;
if (executor == null) {
- WorkQueueManager wqm =
- base.bus.getExtension(WorkQueueManager.class);
+ WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
if (null != wqm) {
if (name != null) {
- executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
+ executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
+ name.getLocalPart());
- }
+ }
if (executor == null) {
executor = wqm.getNamedWorkQueue("jms");
}
if (executor == null) {
executor = wqm.getAutomaticWorkQueue();
}
- }
+ }
}
while (true) {
- javax.jms.Message message = listenSession.consumer().receive();
+ javax.jms.Message message = listenSession.consumer().receive();
if (message == null) {
- getLogger().log(Level.WARNING,
- "Null message received from message consumer.",
- " Exiting ListenerThread::run().");
+ getLogger().log(Level.WARNING, "Null message received from message consumer.",
+ " Exiting ListenerThread::run().");
return;
}
while (message != null) {
- //REVISIT to get the thread pool
- //Executor executor = jmsDestination.callback.getExecutor();
+ // REVISIT to get the thread pool
+ // Executor executor = jmsDestination.callback.getExecutor();
if (executor != null) {
try {
executor.execute(new JMSExecutor(message));
message = null;
} catch (RejectedExecutionException ree) {
- //FIXME - no room left on workqueue, what to do
- //for now, loop until it WILL fit on the queue,
- //although we could just dispatch on this thread.
- }
+ // FIXME - no room left on workqueue, what to do
+ // for now, loop until it WILL fit on the queue,
+ // although we could just dispatch on this thread.
+ }
} else {
getLogger().log(Level.INFO, "handle the incoming message in listener thread");
try {
incoming(message);
} catch (IOException ex) {
getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
- }
- }
+ }
+ }
message = null;
}
}
@@ -336,10 +337,10 @@
}
}
}
-
+
protected class JMSExecutor implements Runnable {
javax.jms.Message message;
-
+
JMSExecutor(javax.jms.Message m) {
message = m;
}
@@ -349,24 +350,23 @@
try {
incoming(message);
} catch (IOException ex) {
- //TODO: Decide what to do if we receive the exception.
- getLogger().log(Level.WARNING,
- "Failed to process incoming message : ", ex);
+ // TODO: Decide what to do if we receive the exception.
+ getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
}
}
-
+
}
-
- // this should deal with the cxf message
+
+ // this should deal with the cxf message
protected class BackChannelConduit extends AbstractConduit {
-
+
protected Message inMessage;
-
+
BackChannelConduit(EndpointReferenceType ref, Message message) {
super(ref);
inMessage = message;
}
-
+
/**
* Register a message observer for incoming messages.
*
@@ -377,65 +377,62 @@
}
/**
- * Send an outbound message, assumed to contain all the name-value
- * mappings of the corresponding input message (if any).
+ * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input
+ * message (if any).
*
* @param message the message to be sent.
*/
public void prepare(Message message) throws IOException {
// setup the message to be send back
- message.put(JMSConstants.JMS_REQUEST_MESSAGE,
- inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
-
+ message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
+
if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
&& inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
- message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS,
- inMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
+ message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
+ .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
}
- message.setContent(OutputStream.class,
- new JMSOutputStream(inMessage, message));
+ message.setContent(OutputStream.class, new JMSOutputStream(inMessage, message));
}
-
+
protected Logger getLogger() {
return LOG;
}
}
-
+
private class JMSOutputStream extends CachedOutputStream {
-
+
private Message inMessage;
private Message outMessage;
private javax.jms.Message reply;
private Queue replyTo;
private QueueSender sender;
-
+
// setup the ByteArrayStream
public JMSOutputStream(Message m, Message o) {
super();
inMessage = m;
outMessage = o;
}
-
- //to prepear the message and get the send out message
+
+ // to prepear the message and get the send out message
private void commitOutputMessage() throws IOException {
-
- JMSMessageHeadersType headers =
- (JMSMessageHeadersType) outMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
- javax.jms.Message request =
- (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
- PooledSession replySession = null;
-
- if (base.isDestinationStyleQueue()) {
+
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+ javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+
+ PooledSession replySession = null;
+
+ if (JMSUtils.isDestinationStyleQueue(address)) {
try {
- //setup the reply message
+ // setup the reply message
replyTo = getReplyToDestination(inMessage);
- replySession = base.sessionFactory.get(false);
+ replySession = sessionFactory.get(false);
sender = (QueueSender)replySession.producer();
-
+
String msgType = JMSConstants.TEXT_MESSAGE_TYPE;
Object replyObj = null;
-
+
if (request instanceof TextMessage) {
StringBuilder builder = new StringBuilder();
this.writeCacheTo(builder);
@@ -448,39 +445,38 @@
replyObj = getBytes();
msgType = JMSConstants.BINARY_MESSAGE_TYPE;
}
-
+
if (getLogger().isLoggable(Level.FINE)) {
+
getLogger().log(Level.FINE, "The response message is ["
+ (replyObj instanceof String
? (String)replyObj : new String((byte[])replyObj))
+ "]");
+
}
- reply = base.marshal(replyObj,
- replySession.session(),
- null,
- msgType);
+ reply = JMSUtils.marshal(replyObj, replySession.session(), null, msgType);
setReplyCorrelationID(request, reply);
- base.setMessageProperties(headers, reply);
- //ensure that the contentType is set to the out jms message header
- base.setContentToProtocalHeader(outMessage);
- Map<String, List<String>> protHeaders =
- CastUtils.cast((Map<?, ?>)outMessage.get(Message.PROTOCOL_HEADERS));
- base.addProtocolHeaders(reply, protHeaders);
+ JMSUtils.setMessageProperties(headers, reply);
+ // ensure that the contentType is set to the out jms message header
+ JMSUtils.setContentToProtocalHeader(outMessage);
+ Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+ .get(Message.PROTOCOL_HEADERS));
+ JMSUtils.addProtocolHeaders(reply, protHeaders);
sendResponse();
-
+
} catch (JMSException ex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
- throw new IOException(ex.getMessage());
+ getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
+ throw new IOException(ex.getMessage());
} catch (NamingException nex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
- throw new IOException(nex.getMessage());
+ getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
+ throw new IOException(nex.getMessage());
} finally {
// house-keeping
if (replySession != null) {
- base.sessionFactory.recycle(replySession);
+ sessionFactory.recycle(replySession);
}
}
} else {
@@ -488,39 +484,36 @@
// domain from CXF client - however a mis-behaving pure JMS
// client could conceivably make suce an invocation, in which
// case we silently discard the reply
- getLogger().log(Level.WARNING,
- "discarding reply for non-oneway invocation ",
- "with 'topic' destinationStyle");
-
- }
-
+ getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
+ "with 'topic' destinationStyle");
+
+ }
+
getLogger().log(Level.FINE, "just server sending reply: ", reply);
// Check the reply time limit Stream close will call for this
-
-
+
}
private void sendResponse() throws JMSException {
- JMSMessageHeadersType headers =
- (JMSMessageHeadersType) inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
- javax.jms.Message request =
- (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
- int deliveryMode = base.getJMSDeliveryMode(headers);
- int priority = base.getJMSPriority(headers);
- long ttl = base.getTimeToLive(headers);
-
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+ javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+
+ int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+ int priority = JMSUtils.getJMSPriority(headers);
+ long ttl = JMSUtils.getTimeToLive(headers);
+
if (ttl <= 0) {
ttl = getServerConfig().getMessageTimeToLive();
}
-
+
long timeToLive = 0;
if (request.getJMSExpiration() > 0) {
TimeZone tz = new SimpleTimeZone(0, "GMT");
Calendar cal = new GregorianCalendar(tz);
- timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+ timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
}
-
+
if (timeToLive >= 0) {
ttl = ttl > 0 ? ttl : timeToLive;
getLogger().log(Level.FINE, "send out the message!");
@@ -528,27 +521,24 @@
} else {
// the request message had dead
getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
- }
+ }
}
-
-
@Override
protected void doFlush() throws IOException {
// TODO Auto-generated method stub
-
+
}
-
@Override
protected void doClose() throws IOException {
-
- commitOutputMessage();
+
+ commitOutputMessage();
}
@Override
protected void onWrite() throws IOException {
- // Do nothing here
+ // Do nothing here
}
}
Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java?rev=692365&r1=692364&r2=692365&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java Fri Sep 5 00:31:37 2008
@@ -28,20 +28,16 @@
import javax.naming.Context;
import javax.naming.NamingException;
-
/**
- * This class acts as the hub of JMS provider usage, creating shared
- * JMS Connections and providing access to a pool of JMS Sessions.
+ * This class acts as the hub of JMS provider usage, creating shared JMS Connections and providing access to a
+ * pool of JMS Sessions.
* <p>
- * A new JMS connection is created for each each port based
- * <jms:address> - however its likely that in practice the same JMS
- * provider will be specified for each port, and hence the connection
- * resources could be shared accross ports.
+ * A new JMS connection is created for each each port based <jms:address> - however its likely that in
+ * practice the same JMS provider will be specified for each port, and hence the connection resources could be
+ * shared accross ports.
* <p>
- * For the moment this class is realized as just a container for
- * static methods, but the intention is to support in future sharing
- * of JMS resources accross compatible ports.
- *
+ * For the moment this class is realized as just a container for static methods, but the intention is to
+ * support in future sharing of JMS resources accross compatible ports.
*/
public final class JMSProviderHub {
@@ -51,111 +47,115 @@
private JMSProviderHub() {
}
-
public String toString() {
return "JMSProviderHub";
}
- protected static void connect(JMSTransport jmsTransport) throws JMSException, NamingException {
- connect(jmsTransport, null, null);
+ protected static void connect(JMSOnConnectCallback onConnectCallback, AddressType addrDetails,
+ SessionPoolType sessionPoolConfig) throws JMSException, NamingException {
+ connect(onConnectCallback, addrDetails, sessionPoolConfig, null, null);
}
-
- protected static void connect(JMSTransport jmsTransport,
- ServerConfig jmsDestConfigBean,
- ServerBehaviorPolicyType runtimePolicy)
- throws JMSException, NamingException {
-
- AddressType addrDetails = jmsTransport.getJMSAddress();
- boolean isQueue = JMSConstants.JMS_QUEUE.equals(addrDetails.getDestinationStyle().value());
-
+
+ private static Destination resolveRequestDestination(Context context, Connection connection,
+ AddressType addrDetails) throws JMSException,
+ NamingException {
+ Destination requestDestination = null;
+ try {
+ // see if jndiDestination is set
+ if (addrDetails.getJndiDestinationName() != null) {
+ requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
+ }
+
+ // if no jndiDestination or it fails see if jmsDestination is set
+ // and try to create it.
+ if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+ requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ .createQueue(addrDetails.getJmsDestinationName());
+ } else {
+ requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ .createTopic(addrDetails.getJmsDestinationName());
+ }
+ }
+ return requestDestination;
+ } catch (NamingException ne) {
+ // Propogate NamingException.
+ throw ne;
+ }
+ }
+
+ protected static void connect(JMSOnConnectCallback onConnectCallBack, AddressType addrDetails,
+ SessionPoolType sessionPoolConfig, ServerConfig jmsDestConfigBean,
+ ServerBehaviorPolicyType runtimePolicy) throws JMSException,
+ NamingException {
+
// get JMS connection resources and destination
//
Context context = JMSUtils.getInitialContext(addrDetails);
Connection connection = null;
-
- if (isQueue) {
- QueueConnectionFactory qcf =
- (QueueConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName());
+
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+ QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(addrDetails
+ .getJndiConnectionFactoryName());
if (addrDetails.isSetConnectionUserName()) {
- connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(),
- addrDetails.getConnectionPassword());
+ connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), addrDetails
+ .getConnectionPassword());
} else {
connection = qcf.createQueueConnection();
}
} else {
- TopicConnectionFactory tcf =
- (TopicConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName());
+ TopicConnectionFactory tcf = (TopicConnectionFactory)context.lookup(addrDetails
+ .getJndiConnectionFactoryName());
if (addrDetails.isSetConnectionUserName()) {
- connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(),
- addrDetails.getConnectionPassword());
+ connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), addrDetails
+ .getConnectionPassword());
} else {
connection = tcf.createTopicConnection();
}
}
-
+
if (null != jmsDestConfigBean) {
String clientID = jmsDestConfigBean.getDurableSubscriptionClientId();
-
- if (clientID != null) {
+
+ if (clientID != null) {
connection.setClientID(clientID);
}
}
connection.start();
-
- Destination requestDestination = null;
- try {
- //see if jndiDestination is set
- if (addrDetails.getJndiDestinationName() != null) {
- requestDestination =
- (Destination)context.lookup(addrDetails.getJndiDestinationName());
- }
-
- //if no jndiDestination or it fails see if jmsDestination is set and try to create it.
- if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
- if (isQueue) {
- requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- .createQueue(addrDetails.getJmsDestinationName());
- } else {
- requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- .createTopic(addrDetails.getJmsDestinationName());
- }
- }
-
- if (requestDestination == null) {
- //fail to locate or create requestDestination throw Exception.
- throw new JMSException("Failed to lookup or create requestDestination");
- }
-
- } catch (NamingException ne) {
- //Propogate NamingException.
- throw ne;
+ Destination requestDestination = resolveRequestDestination(context, connection, addrDetails);
+ if (requestDestination == null) {
+ // fail to locate or create requestDestination throw Exception.
+ throw new JMSException("Failed to lookup or create requestDestination");
}
-
+
+ Destination replyDestination = resolveReplyDestination(addrDetails, context, connection);
+
+ // create session factory to manage session, reply destination,
+ // producer and consumer pooling
+ //
+ JMSSessionFactory sf = new JMSSessionFactory(connection, replyDestination, context, JMSUtils
+ .isDestinationStyleQueue(addrDetails), sessionPoolConfig, runtimePolicy);
+
+ // notify transport that connection is complete
+ onConnectCallBack.connected(requestDestination, replyDestination, sf);
+ }
+
+ private static Destination resolveReplyDestination(AddressType addrDetails, Context context,
+ Connection connection) throws NamingException,
+ JMSException {
Destination replyDestination = null;
-
- //Reply Destination is used (if present) only if the session is point-to-point session
- if (isQueue) {
+
+ // Reply Destination is used (if present) only if the session is
+ // point-to-point session
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
if (addrDetails.getJndiReplyDestinationName() != null) {
replyDestination = (Destination)context.lookup(addrDetails.getJndiReplyDestinationName());
- }
+ }
if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
replyDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
.createQueue(addrDetails.getJmsReplyDestinationName());
}
}
-
- // create session factory to manage session, reply destination,
- // producer and consumer pooling
- //
-
- JMSSessionFactory sf =
- new JMSSessionFactory(connection,
- replyDestination,
- context,
- jmsTransport,
- runtimePolicy);
-
- // notify transport that connection is complete
- jmsTransport.connected(requestDestination, replyDestination, sf);
+ return replyDestination;
}
}
Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=692365&r1=692364&r2=692365&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Fri Sep 5 00:31:37 2008
@@ -44,114 +44,92 @@
import org.apache.cxf.common.util.AbstractTwoStageCache;
/**
- * This class encapsulates the creation and pooling logic for JMS Sessions.
- * The usage patterns for sessions, producers & consumers are as follows ...
+ * This class encapsulates the creation and pooling logic for JMS Sessions. The usage patterns for sessions,
+ * producers & consumers are as follows ...
* <p>
- * client-side: an invoking thread requires relatively short-term exclusive
- * use of a session, an unidentified producer to send the request message,
- * and in the point-to-point domain a consumer for the temporary ReplyTo
- * destination to synchronously receive the reply if the operation is twoway
- * (in the pub-sub domain only oneway operations are supported, so a there
- * is never a requirement for a reply destination)
+ * client-side: an invoking thread requires relatively short-term exclusive use of a session, an unidentified
+ * producer to send the request message, and in the point-to-point domain a consumer for the temporary ReplyTo
+ * destination to synchronously receive the reply if the operation is twoway (in the pub-sub domain only
+ * oneway operations are supported, so a there is never a requirement for a reply destination)
* <p>
- * server-side receive: each port based on <jms:address> requires relatively
- * long-term exclusive use of a session, a consumer with a MessageListener for
- * the JMS destination specified for the port, and an unidentified producer
- * to send the request message
+ * server-side receive: each port based on <jms:address> requires relatively long-term exclusive use of a
+ * session, a consumer with a MessageListener for the JMS destination specified for the port, and an
+ * unidentified producer to send the request message
* <p>
- * server-side send: each dispatch of a twoway request requires relatively
- * short-term exclusive use of a session and an indentified producer (but
- * not a consumer) - note that the session used for the recieve side cannot
- * be re-used for the send, as MessageListener usage precludes any synchronous
- * sends or receives on that session
+ * server-side send: each dispatch of a twoway request requires relatively short-term exclusive use of a
+ * session and an indentified producer (but not a consumer) - note that the session used for the recieve side
+ * cannot be re-used for the send, as MessageListener usage precludes any synchronous sends or receives on
+ * that session
* <p>
- * So on the client-side, pooling of sessions is bound up with pooling
- * of temporary reply destinations, whereas on the server receive side
- * the benefit of pooling is marginal as the session is required from
- * the point at which the port was activated until the Bus is shutdown
- * The server send side resembles the client side,
- * except that a consumer for the temporary destination is never required.
- * Hence different pooling strategies make sense ...
+ * So on the client-side, pooling of sessions is bound up with pooling of temporary reply destinations,
+ * whereas on the server receive side the benefit of pooling is marginal as the session is required from the
+ * point at which the port was activated until the Bus is shutdown The server send side resembles the client
+ * side, except that a consumer for the temporary destination is never required. Hence different pooling
+ * strategies make sense ...
* <p>
- * client-side: a SoftReference-based cache of send/receive sessions is
- * maintained containing an aggregate of a session, indentified producer,
- * temporary reply destination & consumer for same
+ * client-side: a SoftReference-based cache of send/receive sessions is maintained containing an aggregate of
+ * a session, indentified producer, temporary reply destination & consumer for same
* <p>
- * server-side receive: as sessions cannot be usefully recycled, they are
- * simply created on demand and closed when no longer required
+ * server-side receive: as sessions cannot be usefully recycled, they are simply created on demand and closed
+ * when no longer required
* <p>
- * server-side send: a SoftReference-based cache of send-only sessions is
- * maintained containing an aggregate of a session and an indentified producer
+ * server-side send: a SoftReference-based cache of send-only sessions is maintained containing an aggregate
+ * of a session and an indentified producer
* <p>
- * In a pure client or pure server, only a single cache is ever
- * populated. Where client and server logic is co-located, a client
- * session retrieval for a twoway invocation checks the reply-capable
- * cache first and then the send-only cache - if a session is
- * available in the later then its used after a tempory destination is
- * created before being recycled back into the reply-capable cache. A
- * server send side retrieval or client retrieval for a oneway
- * invocation checks the send-only cache first and then the
- * reply-capable cache - if a session is available in the later then
- * its used and the tempory destination is ignored. So in the
- * co-located case, sessions migrate from the send-only cache to the
- * reply-capable cache as necessary.
+ * In a pure client or pure server, only a single cache is ever populated. Where client and server logic is
+ * co-located, a client session retrieval for a twoway invocation checks the reply-capable cache first and
+ * then the send-only cache - if a session is available in the later then its used after a tempory destination
+ * is created before being recycled back into the reply-capable cache. A server send side retrieval or client
+ * retrieval for a oneway invocation checks the send-only cache first and then the reply-capable cache - if a
+ * session is available in the later then its used and the tempory destination is ignored. So in the
+ * co-located case, sessions migrate from the send-only cache to the reply-capable cache as necessary.
* <p>
- *
*/
public class JMSSessionFactory {
private static final Logger LOG = LogUtils.getL7dLogger(JMSSessionFactory.class);
-
+
private int lowWaterMark;
private int highWaterMark;
private final Context initialContext;
- private final Connection theConnection;
+ private final Connection theConnection;
private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
private final Destination theReplyDestination;
- private final JMSTransport jmsTransport;
private final ServerBehaviorPolicyType runtimePolicy;
-
+ private boolean destinationIsQueue;
+
/**
* Constructor.
- *
+ *
* @param connection the shared {Queue|Topic}Connection
*/
- public JMSSessionFactory(Connection connection,
- Destination replyDestination,
- Context context,
- JMSTransport tbb,
+ public JMSSessionFactory(Connection connection, Destination replyDestination, Context context,
+ boolean destinationIsQueue, SessionPoolType sessionPoolConfig,
ServerBehaviorPolicyType runtimePolicy) {
theConnection = connection;
theReplyDestination = replyDestination;
initialContext = context;
- jmsTransport = tbb;
this.runtimePolicy = runtimePolicy;
-
- SessionPoolType sessionPoolConfig = jmsTransport.getSessionPool();
-
+
lowWaterMark = sessionPoolConfig.getLowWaterMark();
highWaterMark = sessionPoolConfig.getHighWaterMark();
-
+ this.destinationIsQueue = destinationIsQueue;
// create session caches (REVISIT sizes should be configurable)
//
- if (isDestinationStyleQueue()) {
+ if (destinationIsQueue) {
// the reply capable cache is only required in the point-to-point
// domain
//
- replyCapableSessionCache =
- new AbstractTwoStageCache<PooledSession>(
- lowWaterMark,
- highWaterMark,
- 0,
- this) {
- public final PooledSession create() throws JMSException {
- return createPointToPointReplyCapableSession();
- }
- };
+ replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark,
+ 0, this) {
+ public final PooledSession create() throws JMSException {
+ return createPointToPointReplyCapableSession();
+ }
+ };
try {
replyCapableSessionCache.populateCache();
@@ -161,16 +139,12 @@
// send-only cache for point-to-point oneway requests and replies
//
- sendOnlySessionCache =
- new AbstractTwoStageCache<PooledSession>(
- lowWaterMark,
- highWaterMark,
- 0,
- this) {
- public final PooledSession create() throws JMSException {
- return createPointToPointSendOnlySession();
- }
- };
+ sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
+ this) {
+ public final PooledSession create() throws JMSException {
+ return createPointToPointSendOnlySession();
+ }
+ };
try {
sendOnlySessionCache.populateCache();
@@ -180,16 +154,12 @@
} else {
// send-only cache for pub-sub oneway requests
//
- sendOnlySessionCache =
- new AbstractTwoStageCache<PooledSession>(
- lowWaterMark,
- highWaterMark,
- 0,
- this) {
- public final PooledSession create() throws JMSException {
- return createPubSubSession(true, false, null);
- }
- };
+ sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
+ this) {
+ public final PooledSession create() throws JMSException {
+ return createPubSubSession(true, false, null);
+ }
+ };
try {
sendOnlySessionCache.populateCache();
@@ -199,31 +169,30 @@
}
}
- //--java.lang.Object Overrides----------------------------------------------
+ // --java.lang.Object Overrides----------------------------------------------
public String toString() {
return "JMSSessionFactory";
}
-
- //--Methods-----------------------------------------------------------------
+ // --Methods-----------------------------------------------------------------
protected Connection getConnection() {
return theConnection;
}
- public Queue getQueueFromInitialContext(String queueName)
- throws NamingException {
- return (Queue) initialContext.lookup(queueName);
+ public Queue getQueueFromInitialContext(String queueName) throws NamingException {
+ return (Queue)initialContext.lookup(queueName);
}
public PooledSession get(boolean replyCapable) throws JMSException {
return get(null, replyCapable);
}
-
+
/**
* Retrieve a new or cached Session.
+ *
* @param replyDest Destination name if coming from wsa:Header
- * @param replyCapable true iff the session is to be used to receive replies
- * (implies client side twoway invocation )
+ * @param replyCapable true iff the session is to be used to receive replies (implies client side twoway
+ * invocation )
* @return a new or cached Session
*/
public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException {
@@ -245,19 +214,19 @@
QueueSession session = (QueueSession)ret.session();
Queue destination = null;
String selector = null;
-
+
if (null != theReplyDestination || null != replyDest) {
- destination = null != replyDest ? (Queue) replyDest : (Queue)theReplyDestination;
-
+ destination = null != replyDest ? (Queue)replyDest : (Queue)theReplyDestination;
+
selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
}
-
+
if (destination == null) {
- //neither replyDestination not replyDest are present.
+ // neither replyDestination not replyDest are present.
destination = session.createTemporaryQueue();
selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
}
-
+
ret.destination(destination);
MessageConsumer consumer = session.createReceiver(destination, selector);
ret.consumer(consumer);
@@ -309,9 +278,8 @@
/**
* Retrieve a new
- *
- * @param destination the target JMS queue or topic (non-null implies
- * server receive side)
+ *
+ * @param destination the target JMS queue or topic (non-null implies server receive side)
* @return a new or cached Session
*/
public PooledSession get(Destination destination) throws JMSException {
@@ -320,7 +288,7 @@
// the destination is only specified on the server receive side,
// in which case a new session is always created
//
- if (isDestinationStyleQueue()) {
+ if (destinationIsQueue) {
ret = createPointToPointServerSession(destination);
} else {
ret = createPubSubSession(false, true, destination);
@@ -331,7 +299,7 @@
/**
* Return a Session to the pool
- *
+ *
* @param pooled_session the session to recycle
*/
public void recycle(PooledSession pooledSession) {
@@ -343,8 +311,9 @@
synchronized (this) {
// re-cache session, closing if it cannot be it can be accomodated
//
- discard = replyCapable ? (!replyCapableSessionCache.recycle(pooledSession))
- : (!sendOnlySessionCache.recycle(pooledSession));
+ discard = replyCapable
+ ? (!replyCapableSessionCache.recycle(pooledSession)) : (!sendOnlySessionCache
+ .recycle(pooledSession));
}
if (discard) {
@@ -356,7 +325,6 @@
}
}
-
/**
* Shutdown the session factory.
*/
@@ -391,79 +359,69 @@
sendOnlySessionCache = null;
}
-
/**
* Helper method to create a point-to-point pooled session.
- *
+ *
* @param producer true iff producing
* @param consumer true iff consuming
* @param destination the target destination
* @return an appropriate pooled session
*/
PooledSession createPointToPointReplyCapableSession() throws JMSException {
- QueueSession session =
- ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+ Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
String selector = null;
-
+
if (null != theReplyDestination) {
destination = theReplyDestination;
-
- selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
-
-
+
+ selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
+
} else {
destination = session.createTemporaryQueue();
}
-
+
MessageConsumer consumer = session.createReceiver((Queue)destination, selector);
- return new PooledSession(session,
- destination,
- session.createSender(null),
- consumer);
+ return new PooledSession(session, destination, session.createSender(null), consumer);
}
-
/**
* Helper method to create a point-to-point pooled session.
- *
+ *
* @return an appropriate pooled session
*/
PooledSession createPointToPointSendOnlySession() throws JMSException {
- QueueSession session =
- ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+ Session.AUTO_ACKNOWLEDGE);
return new PooledSession(session, null, session.createSender(null), null);
}
-
/**
* Helper method to create a point-to-point pooled session for consumer only.
- *
+ *
* @param destination the target destination
* @return an appropriate pooled session
*/
private PooledSession createPointToPointServerSession(Destination destination) throws JMSException {
- QueueSession session =
- ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
- return new PooledSession(session, destination, session.createSender(null),
- session.createReceiver((Queue)destination,
- runtimePolicy.getMessageSelector()));
- }
+ QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ return new PooledSession(session, destination, session.createSender(null), session
+ .createReceiver((Queue)destination, runtimePolicy.getMessageSelector()));
+ }
/**
* Helper method to create a pub-sub pooled session.
- *
+ *
* @param producer true iff producing
* @param consumer true iff consuming
* @param destination the target destination
* @return an appropriate pooled session
*/
- PooledSession createPubSubSession(boolean producer,
- boolean consumer,
- Destination destination) throws JMSException {
+ PooledSession createPubSubSession(boolean producer, boolean consumer, Destination destination)
+ throws JMSException {
TopicSession session = ((TopicConnection)theConnection).createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = null;
@@ -471,23 +429,16 @@
String messageSelector = runtimePolicy.getMessageSelector();
String durableName = runtimePolicy.getDurableSubscriberName();
if (durableName != null) {
- sub = session.createDurableSubscriber((Topic)destination,
- durableName,
- messageSelector,
- false);
+ sub = session
+ .createDurableSubscriber((Topic)destination, durableName, messageSelector, false);
} else {
- sub = session.createSubscriber((Topic)destination,
- messageSelector,
- false);
+ sub = session.createSubscriber((Topic)destination, messageSelector, false);
}
}
- return new PooledSession(session,
- null,
- producer ? session.createPublisher(null) : null,
- sub);
+ return new PooledSession(session, null, producer ? session.createPublisher(null) : null, sub);
}
-
+
private String generateUniqueSelector(Object obj) {
String host = "localhost";
@@ -495,17 +446,10 @@
InetAddress addr = InetAddress.getLocalHost();
host = addr.getHostName();
} catch (UnknownHostException ukex) {
- //Default to localhost.
+ // Default to localhost.
}
long time = Calendar.getInstance().getTimeInMillis();
- return host + "_"
- + System.getProperty("user.name") + "_"
- + obj + time;
- }
-
- private boolean isDestinationStyleQueue() {
- return JMSConstants.JMS_QUEUE.equals(
- jmsTransport.getJMSAddress().getDestinationStyle().value());
+ return host + "_" + System.getProperty("user.name") + "_" + obj + time;
}
}