You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC
svn commit: r1220336 [5/8] - in /qpid/trunk/qpid/java: ./
client/src/main/java/org/apache/qpid/client/ jca/ jca/example/
jca/example/conf/ jca/example/src/ jca/example/src/main/
jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,419 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAMessageProducer.
+ *
+ */
+public class QpidRAMessageProducer implements MessageProducer
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageProducer.class);
+
+ /** The wrapped message producer */
+ protected MessageProducer _producer;
+
+ /** The session for this consumer */
+ protected QpidRASessionImpl _session;
+
+ /**
+ * Create a new wrapper
+ * @param producer the producer
+ * @param session the session
+ */
+ public QpidRAMessageProducer(final MessageProducer producer, final QpidRASessionImpl session)
+ {
+ this._producer = producer;
+ this._session = session;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("new QpidRAMessageProducer " + this +
+ " producer=" +
+ Util.asString(producer) +
+ " session=" +
+ session);
+ }
+ }
+
+ /**
+ * Close
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close " + this);
+ }
+ try
+ {
+ closeProducer();
+ }
+ finally
+ {
+ _session.removeProducer(this);
+ }
+ }
+
+ /**
+ * Send message
+ * @param destination The destination
+ * @param message The message
+ * @param deliveryMode The delivery mode
+ * @param priority The priority
+ * @param timeToLive The time to live
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Destination destination,
+ final Message message,
+ final int deliveryMode,
+ final int priority,
+ final long timeToLive) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this +
+ " destination=" +
+ destination +
+ " message=" +
+ Util.asString(message) +
+ " deliveryMode=" +
+ deliveryMode +
+ " priority=" +
+ priority +
+ " ttl=" +
+ timeToLive);
+ }
+
+ checkState();
+
+ _producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Send message
+ * @param destination The destination
+ * @param message The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Destination destination, final Message message) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+ }
+
+ checkState();
+
+ _producer.send(destination, message);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Send message
+ * @param message The message
+ * @param deliveryMode The delivery mode
+ * @param priority The priority
+ * @param timeToLive The time to live
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this +
+ " message=" +
+ Util.asString(message) +
+ " deliveryMode=" +
+ deliveryMode +
+ " priority=" +
+ priority +
+ " ttl=" +
+ timeToLive);
+ }
+
+ checkState();
+
+ _producer.send(message, deliveryMode, priority, timeToLive);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Send message
+ * @param message The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Message message) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this + " message=" + Util.asString(message));
+ }
+
+ checkState();
+
+ _producer.send(message);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Get the delivery mode
+ * @return The mode
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getDeliveryMode() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDeliveryMode()");
+ }
+
+ return _producer.getDeliveryMode();
+ }
+
+ /**
+ * Get the destination
+ * @return The destination
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Destination getDestination() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDestination()");
+ }
+
+ return _producer.getDestination();
+ }
+
+ /**
+ * Disable message id
+ * @return True if disable
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getDisableMessageID() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDisableMessageID()");
+ }
+
+ return _producer.getDisableMessageID();
+ }
+
+ /**
+ * Disable message timestamp
+ * @return True if disable
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDisableMessageTimestamp()");
+ }
+
+ return _producer.getDisableMessageTimestamp();
+ }
+
+ /**
+ * Get the priority
+ * @return The priority
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getPriority() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPriority()");
+ }
+
+ return _producer.getPriority();
+ }
+
+ /**
+ * Get the time to live
+ * @return The ttl
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long getTimeToLive() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTimeToLive()");
+ }
+
+ return _producer.getTimeToLive();
+ }
+
+ /**
+ * Set the delivery mode
+ * @param deliveryMode The mode
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setDeliveryMode(final int deliveryMode) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDeliveryMode(" + deliveryMode + ")");
+ }
+
+ _producer.setDeliveryMode(deliveryMode);
+ }
+
+ /**
+ * Set disable message id
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setDisableMessageID(final boolean value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDisableMessageID(" + value + ")");
+ }
+
+ _producer.setDisableMessageID(value);
+ }
+
+ /**
+ * Set disable message timestamp
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setDisableMessageTimestamp(final boolean value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDisableMessageTimestamp(" + value + ")");
+ }
+
+ _producer.setDisableMessageTimestamp(value);
+ }
+
+ /**
+ * Set the priority
+ * @param defaultPriority The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setPriority(final int defaultPriority) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPriority(" + defaultPriority + ")");
+ }
+
+ _producer.setPriority(defaultPriority);
+ }
+
+ /**
+ * Set the ttl
+ * @param timeToLive The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setTimeToLive(final long timeToLive) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTimeToLive(" + timeToLive + ")");
+ }
+
+ _producer.setTimeToLive(timeToLive);
+ }
+
+ /**
+ * Check state
+ * @exception JMSException Thrown if an error occurs
+ */
+ void checkState() throws JMSException
+ {
+ _session.checkState();
+ }
+
+ /**
+ * Close producer
+ * @exception JMSException Thrown if an error occurs
+ */
+ void closeProducer() throws JMSException
+ {
+ _producer.close();
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.resource.ResourceException;
+import javax.resource.spi.ManagedConnectionMetaData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Managed connection meta data
+ *
+ */
+public class QpidRAMetaData implements ManagedConnectionMetaData
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMetaData.class);
+
+ /** The managed connection */
+ private final QpidRAManagedConnection _mc;
+
+ /**
+ * Constructor
+ * @param mc The managed connection
+ */
+ public QpidRAMetaData(final QpidRAManagedConnection mc)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + mc + ")");
+ }
+
+ this._mc = mc;
+ }
+
+ /**
+ * Get the EIS product name
+ * @return The name
+ * @exception ResourceException Thrown if operation fails
+ */
+ public String getEISProductName() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getEISProductName()");
+ }
+
+ return "Qpid";
+ }
+
+ /**
+ * Get the EIS product version
+ * @return The version
+ * @exception ResourceException Thrown if operation fails
+ */
+ public String getEISProductVersion() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getEISProductVersion()");
+ }
+
+ return "2.0";
+ }
+
+ /**
+ * Get the user name
+ * @return The user name
+ * @exception ResourceException Thrown if operation fails
+ */
+ public String getUserName() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUserName()");
+ }
+
+ return _mc.getUserName();
+ }
+
+ /**
+ * Get the maximum number of connections -- RETURNS 0
+ * @return The number
+ * @exception ResourceException Thrown if operation fails
+ */
+ public int getMaxConnections() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMaxConnections()");
+ }
+
+ return 0;
+ }
+
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAObjectMessage extends QpidRAMessage implements ObjectMessage
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAObjectMessage.class);
+
+ /**
+ * Create a new wrapper
+ * @param message the message
+ * @param session the session
+ */
+ public QpidRAObjectMessage(final ObjectMessage message, final QpidRASessionImpl session)
+ {
+ super(message, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get the object
+ * @return The object
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Serializable getObject() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getObject()");
+ }
+
+ return ((ObjectMessage)_message).getObject();
+ }
+
+ /**
+ * Set the object
+ * @param object The object
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setObject(final Serializable object) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setObject(" + object + ")");
+ }
+
+ ((ObjectMessage)_message).setObject(object);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The RA default properties - these are set in the ra.xml file
+ *
+ */
+public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
+{
+ /** Serial version UID */
+ private static final long serialVersionUID = -4823893873707374791L;
+
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
+
+ private static final int DEFAULT_SETUP_ATTEMPTS = 10;
+
+ private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
+ private int _setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+
+ private long _setupInterval = DEFAULT_SETUP_INTERVAL;
+
+ /** Use Local TX instead of XA */
+ private Boolean _localTx = false;
+
+ /** Class used to locate the Transaction Manager. */
+ private String _transactionManagerLocatorClass ;
+
+ /** Method used to locate the TM */
+ private String _transactionManagerLocatorMethod ;
+
+
+ /**
+ * Constructor
+ */
+ public QpidRAProperties()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor()");
+ }
+ }
+
+ /**
+ * Get the use XA flag
+ * @return The value
+ */
+ public Boolean getUseLocalTx()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUseLocalTx()");
+ }
+
+ return _localTx;
+ }
+
+ /**
+ * Set the use XA flag
+ * @param localTx The value
+ */
+ public void setUseLocalTx(final Boolean localTx)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUseLocalTx(" + localTx + ")");
+ }
+
+ this._localTx = localTx;
+ }
+
+ public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorClass(" + transactionManagerLocatorClass + ")");
+ }
+
+ this._transactionManagerLocatorClass = transactionManagerLocatorClass;
+ }
+
+ public String getTransactionManagerLocatorClass()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorClass()");
+ }
+
+ return _transactionManagerLocatorClass;
+ }
+
+ public String getTransactionManagerLocatorMethod()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorMethod()");
+ }
+
+ return _transactionManagerLocatorMethod;
+ }
+
+ public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorMethod(" + transactionManagerLocatorMethod + ")");
+ }
+
+ this._transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+ }
+
+ public int getSetupAttempts()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupAttempts()");
+ }
+
+ return _setupAttempts;
+ }
+
+ public void setSetupAttempts(int setupAttempts)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+
+ this._setupAttempts = setupAttempts;
+ }
+
+ public long getSetupInterval()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupInterval()");
+ }
+
+ return _setupInterval;
+ }
+
+ public void setSetupInterval(long setupInterval)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupInterval(" + setupInterval + ")");
+ }
+
+ this._setupInterval = setupInterval;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "QpidRAProperties[localTx=" + _localTx +
+ ", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
+ ", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
+ ", setupAttempts=" + _setupAttempts +
+ ", setupInterval=" + _setupInterval + "]";
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAQueueBrowser.
+ *
+ */
+public class QpidRAQueueBrowser implements QueueBrowser
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueBrowser.class);
+
+ /** The wrapped queue browser */
+ protected QueueBrowser _browser;
+
+ /** The session for this consumer */
+ protected QpidRASessionImpl _session;
+
+ /** The closed flag */
+ private AtomicBoolean _isClosed = new AtomicBoolean();
+
+ /**
+ * Create a new wrapper
+ * @param browser the browser
+ * @param session the session
+ */
+ public QpidRAQueueBrowser(final QueueBrowser browser, final QpidRASessionImpl session)
+ {
+ _browser = browser;
+ _session = session;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("new QpidRAQueueBrowser " + this +
+ " browser=" +
+ Util.asString(browser) +
+ " session=" +
+ session);
+ }
+ }
+
+ /**
+ * Close
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close " + this);
+ }
+
+ if (!_isClosed.getAndSet(true))
+ {
+ try
+ {
+ _browser.close();
+ }
+ finally
+ {
+ _session.removeQueueBrowser(this);
+ }
+ }
+ }
+
+ /**
+ * Get the queue associated with this browser.
+ * @return the associated queue.
+ */
+ public Queue getQueue()
+ throws JMSException
+ {
+ return _browser.getQueue();
+ }
+
+ /**
+ * Get the message selector associated with this browser.
+ * @return the associated message selector.
+ */
+ public String getMessageSelector()
+ throws JMSException
+ {
+ return _browser.getMessageSelector();
+ }
+
+ /**
+ * Get an enumeration for the current browser.
+ * @return The enumeration.
+ */
+ public Enumeration getEnumeration()
+ throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getEnumeration " + this + " browser=" + Util.asString(_browser));
+ }
+
+ _session.lock();
+ try
+ {
+ _session.checkState();
+ return _browser.getEnumeration();
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a queue receiver
+ *
+ */
+public class QpidRAQueueReceiver extends QpidRAMessageConsumer implements QueueReceiver
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueReceiver.class);
+
+ /**
+ * Create a new wrapper
+ * @param consumer the queue receiver
+ * @param session the session
+ */
+ public QpidRAQueueReceiver(final QueueReceiver consumer, final QpidRASessionImpl session)
+ {
+ super(consumer, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(consumer) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get queue
+ * @return The queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Queue getQueue() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getQueue()");
+ }
+
+ checkState();
+ return ((QueueReceiver)_consumer).getQueue();
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAQueueSender.
+ *
+ */
+public class QpidRAQueueSender extends QpidRAMessageProducer implements QueueSender
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueSender.class);
+
+ /**
+ * Create a new wrapper
+ * @param producer the producer
+ * @param session the session
+ */
+ public QpidRAQueueSender(final QueueSender producer, final QpidRASessionImpl session)
+ {
+ super(producer, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(producer) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get queue
+ * @return The queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Queue getQueue() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getQueue()");
+ }
+
+ return ((QueueSender)_producer).getQueue();
+ }
+
+ /**
+ * Send message
+ * @param destination The destination
+ * @param message The message
+ * @param deliveryMode The delivery mode
+ * @param priority The priority
+ * @param timeToLive The time to live
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Queue destination,
+ final Message message,
+ final int deliveryMode,
+ final int priority,
+ final long timeToLive) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this +
+ " destination=" +
+ destination +
+ " message=" +
+ Util.asString(message) +
+ " deliveryMode=" +
+ deliveryMode +
+ " priority=" +
+ priority +
+ " ttl=" +
+ timeToLive);
+ }
+
+ checkState();
+ _producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Send message
+ * @param destination The destination
+ * @param message The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void send(final Queue destination, final Message message) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+ }
+
+ checkState();
+ _producer.send(destination, message);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+
+public interface QpidRASession
+{
+ public void setQpidSessionFactory(QpidRASessionFactory sf);
+
+ public void start() throws JMSException;
+
+ public void close() throws JMSException;
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XATopicConnection;
+
+/**
+ * A joint interface for all connection types
+ *
+ */
+public interface QpidRASessionFactory extends Connection, TopicConnection, QueueConnection, XAConnection,
+ XATopicConnection, XAQueueConnection
+{
+ /** Error message for strict behaviour */
+ String ISE = "This method is not applicable inside the application server. See the J2EE spec, e.g. J2EE1.4 Section 6.6";
+
+ /**
+ * Add a temporary queue
+ * @param temp The temporary queue
+ */
+ void addTemporaryQueue(TemporaryQueue temp);
+
+ /**
+ * Add a temporary topic
+ * @param temp The temporary topic
+ */
+ void addTemporaryTopic(TemporaryTopic temp);
+
+ /**
+ * Notification that a session is closed
+ * @param session The session
+ * @throws JMSException for any error
+ */
+ void closeSession(QpidRASessionImpl session) throws JMSException;
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,911 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.naming.Reference;
+import javax.resource.Referenceable;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the JMS Connection API and produces {@link QpidRASessionImpl} objects.
+ *
+ */
+public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Referenceable
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRASessionFactoryImpl.class);
+
+ /** Are we closed? */
+ private boolean _closed = false;
+
+ /** The naming reference */
+ private Reference _reference;
+
+ /** The user name */
+ private String _userName;
+
+ /** The password */
+ private String _password;
+
+ /** The client ID */
+ private String _clientID;
+
+ /** The connection type */
+ private final int _type;
+
+ /** Whether we are started */
+ private boolean _started = false;
+
+ /** The managed connection factory */
+ private final QpidRAManagedConnectionFactory _mcf;
+
+ /** The connection manager */
+ private ConnectionManager _cm;
+
+ /** The sessions */
+ private final Set<QpidRASession> _sessions = new HashSet<QpidRASession>();
+
+ /** The temporary queues */
+ private final Set<TemporaryQueue> _tempQueues = new HashSet<TemporaryQueue>();
+
+ /** The temporary topics */
+ private final Set<TemporaryTopic> _tempTopics = new HashSet<TemporaryTopic>();
+
+ /**
+ * Constructor
+ * @param mcf The managed connection factory
+ * @param cm The connection manager
+ * @param type The connection type
+ */
+ public QpidRASessionFactoryImpl(final QpidRAManagedConnectionFactory mcf,
+ final ConnectionManager cm,
+ final int type)
+ {
+ this._mcf = mcf;
+
+ if (cm == null)
+ {
+ this._cm = new QpidRAConnectionManager();
+ }
+ else
+ {
+ this._cm = cm;
+ }
+
+ this._type = type;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + mcf + ", " + cm + ", " + type);
+ }
+ }
+
+ /**
+ * Set the naming reference
+ * @param reference The reference
+ */
+ public void setReference(final Reference reference)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setReference(" + reference + ")");
+ }
+
+ this._reference = reference;
+ }
+
+ /**
+ * Get the naming reference
+ * @return The reference
+ */
+ public Reference getReference()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getReference()");
+ }
+
+ return _reference;
+ }
+
+ /**
+ * Set the user name
+ * @param name The user name
+ */
+ public void setUserName(final String name)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUserName(" + name + ")");
+ }
+
+ _userName = name;
+ }
+
+ /**
+ * Set the password
+ * @param password The password
+ */
+ public void setPassword(final String password)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPassword(****)");
+ }
+
+ this._password = password;
+ }
+
+ /**
+ * Get the client ID
+ * @return The client ID
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getClientID() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getClientID()");
+ }
+
+ checkClosed();
+
+ if (_clientID == null)
+ {
+ try
+ {
+ return _mcf.getDefaultAMQConnectionFactory().getConnectionURL().getClientName() ;
+ }
+ catch (final ResourceException re)
+ {
+ final JMSException jmse = new JMSException("Unexpected exception obtaining resource") ;
+ jmse.initCause(re) ;
+ throw jmse ;
+ }
+ }
+
+ return _clientID;
+ }
+
+ /**
+ * Set the client ID -- throws IllegalStateException
+ * @param cID The client ID
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setClientID(final String cID) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setClientID(" + cID + ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a queue session
+ * @param transacted Use transactions
+ * @param acknowledgeMode The acknowledge mode
+ * @return The queue session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createQueueSession(" + transacted + ", " + acknowledgeMode + ")");
+ }
+
+ checkClosed();
+
+ if (_type == QpidRAConnectionFactory.TOPIC_CONNECTION || _type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Can not get a queue session from a topic connection");
+ }
+
+ return (QueueSession)allocateConnection(transacted, acknowledgeMode, _type);
+ }
+
+ /**
+ * Create a XA queue session
+ * @return The XA queue session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public XAQueueSession createXAQueueSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createXAQueueSession()");
+ }
+
+ checkClosed();
+
+ if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.TOPIC_CONNECTION ||
+ _type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Can not get a topic session from a queue connection");
+ }
+
+ return (XAQueueSession) allocateConnection(_type);
+ }
+
+ /**
+ * Create a connection consumer -- throws IllegalStateException
+ * @param queue The queue
+ * @param messageSelector The message selector
+ * @param sessionPool The session pool
+ * @param maxMessages The number of max messages
+ * @return The connection consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String messageSelector,
+ final ServerSessionPool sessionPool,
+ final int maxMessages) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionConsumer(" + queue +
+ ", " +
+ messageSelector +
+ ", " +
+ sessionPool +
+ ", " +
+ maxMessages +
+ ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a topic session
+ * @param transacted Use transactions
+ * @param acknowledgeMode The acknowledge mode
+ * @return The topic session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTopicSession(" + transacted + ", " + acknowledgeMode + ")");
+ }
+
+ checkClosed();
+
+ if (_type == QpidRAConnectionFactory.QUEUE_CONNECTION || _type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Can not get a topic session from a queue connection");
+ }
+
+ return (TopicSession) allocateConnection(transacted, acknowledgeMode, _type);
+ }
+
+ /**
+ * Create a XA topic session
+ * @return The XA topic session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public XATopicSession createXATopicSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createXATopicSession()");
+ }
+
+ checkClosed();
+
+ if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Can not get a topic session from a queue connection");
+ }
+
+ return (XATopicSession) allocateConnection(_type);
+ }
+
+ /**
+ * Create a connection consumer -- throws IllegalStateException
+ * @param topic The topic
+ * @param messageSelector The message selector
+ * @param sessionPool The session pool
+ * @param maxMessages The number of max messages
+ * @return The connection consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionConsumer createConnectionConsumer(final Topic topic,
+ final String messageSelector,
+ final ServerSessionPool sessionPool,
+ final int maxMessages) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionConsumer(" + topic +
+ ", " +
+ messageSelector +
+ ", " +
+ sessionPool +
+ ", " +
+ maxMessages +
+ ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a durable connection consumer -- throws IllegalStateException
+ * @param topic The topic
+ * @param subscriptionName The subscription name
+ * @param messageSelector The message selector
+ * @param sessionPool The session pool
+ * @param maxMessages The number of max messages
+ * @return The connection consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
+ final String subscriptionName,
+ final String messageSelector,
+ final ServerSessionPool sessionPool,
+ final int maxMessages) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionConsumer(" + topic +
+ ", " +
+ subscriptionName +
+ ", " +
+ messageSelector +
+ ", " +
+ sessionPool +
+ ", " +
+ maxMessages +
+ ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a connection consumer -- throws IllegalStateException
+ * @param destination The destination
+ * @param pool The session pool
+ * @param maxMessages The number of max messages
+ * @return The connection consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionConsumer createConnectionConsumer(final Destination destination,
+ final ServerSessionPool pool,
+ final int maxMessages) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionConsumer(" + destination +
+ ", " +
+ pool +
+ ", " +
+ maxMessages +
+ ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a connection consumer -- throws IllegalStateException
+ * @param destination The destination
+ * @param name The name
+ * @param pool The session pool
+ * @param maxMessages The number of max messages
+ * @return The connection consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionConsumer createConnectionConsumer(final Destination destination,
+ final String name,
+ final ServerSessionPool pool,
+ final int maxMessages) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionConsumer(" + destination +
+ ", " +
+ name +
+ ", " +
+ pool +
+ ", " +
+ maxMessages +
+ ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Create a session
+ * @param transacted Use transactions
+ * @param acknowledgeMode The acknowledge mode
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSession(" + transacted + ", " + acknowledgeMode + ")");
+ }
+
+ checkClosed();
+ return (Session) allocateConnection(transacted, acknowledgeMode, _type);
+ }
+
+ /**
+ * Create a XA session
+ * @return The XA session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public XASession createXASession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createXASession()");
+ }
+
+ checkClosed();
+ return (XASession) allocateConnection(_type);
+ }
+
+ /**
+ * Get the connection metadata
+ * @return The connection metadata
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMetaData()");
+ }
+
+ checkClosed();
+ return _mcf.getMetaData();
+ }
+
+ /**
+ * Get the exception listener -- throws IllegalStateException
+ * @return The exception listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ExceptionListener getExceptionListener() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getExceptionListener()");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Set the exception listener -- throws IllegalStateException
+ * @param listener The exception listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setExceptionListener(final ExceptionListener listener) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setExceptionListener(" + listener + ")");
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Start
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void start() throws JMSException
+ {
+ checkClosed();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start() " + this);
+ }
+
+ synchronized (_sessions)
+ {
+ if (_started)
+ {
+ return;
+ }
+ _started = true;
+ for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
+ {
+ QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+ session.start();
+ }
+ }
+ }
+
+ /**
+ * Stop -- throws IllegalStateException
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void stop() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop() " + this);
+ }
+
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+
+ /**
+ * Close
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close() " + this);
+ }
+
+ if (_closed)
+ {
+ return;
+ }
+
+ _closed = true;
+
+ synchronized (_sessions)
+ {
+ for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
+ {
+ QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+ try
+ {
+ session.closeSession();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing session", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_tempQueues)
+ {
+ for (Iterator<TemporaryQueue> i = _tempQueues.iterator(); i.hasNext();)
+ {
+ TemporaryQueue temp = (TemporaryQueue)i.next();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Closing temporary queue " + temp + " for " + this);
+ }
+ temp.delete();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error deleting temporary queue", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_tempTopics)
+ {
+ for (Iterator<TemporaryTopic> i = _tempTopics.iterator(); i.hasNext();)
+ {
+ TemporaryTopic temp = (TemporaryTopic)i.next();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Closing temporary topic " + temp + " for " + this);
+ }
+ temp.delete();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error deleting temporary queue", t);
+ }
+ i.remove();
+ }
+ }
+ }
+
+ /**
+ * Close session
+ * @param session The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void closeSession(final QpidRASessionImpl session) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("closeSession(" + session + ")");
+ }
+
+ synchronized (_sessions)
+ {
+ _sessions.remove(session);
+ }
+ }
+
+ /**
+ * Add temporary queue
+ * @param temp The temporary queue
+ */
+ public void addTemporaryQueue(final TemporaryQueue temp)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addTemporaryQueue(" + temp + ")");
+ }
+
+ synchronized (_tempQueues)
+ {
+ _tempQueues.add(temp);
+ }
+ }
+
+ /**
+ * Add temporary topic
+ * @param temp The temporary topic
+ */
+ public void addTemporaryTopic(final TemporaryTopic temp)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addTemporaryTopic(" + temp + ")");
+ }
+
+ synchronized (_tempTopics)
+ {
+ _tempTopics.add(temp);
+ }
+ }
+
+ /**
+ * Allocation a connection
+ * @param sessionType The session type
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ protected QpidRASession allocateConnection(final int sessionType) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("allocateConnection(" + sessionType + ")");
+ }
+
+ try
+ {
+ synchronized (_sessions)
+ {
+ if (_sessions.isEmpty() == false)
+ {
+ throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
+ }
+
+ QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(sessionType);
+ info.setUserName(_userName);
+ info.setPassword(_password);
+ info.setClientID(_clientID);
+ info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Allocating session for " + this + " with request info=" + info);
+ }
+
+ QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
+
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Allocated " + this + " session=" + session);
+ }
+
+ session.setQpidSessionFactory(this);
+
+ if (_started)
+ {
+ session.start();
+ }
+
+ _sessions.add(session);
+
+ return session;
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ if (t instanceof Exception)
+ {
+ throw (Exception)t;
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected error: ", t);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _log.error("Could not create session", e);
+
+ JMSException je = new JMSException("Could not create a session: " + e.getMessage());
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ /**
+ * Allocation a connection
+ * @param transacted Use transactions
+ * @param acknowledgeMode The acknowledge mode
+ * @param sessionType The session type
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ protected QpidRASession allocateConnection(final boolean transacted, int acknowledgeMode, final int sessionType) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("allocateConnection(" + transacted +
+ ", " +
+ acknowledgeMode +
+ ", " +
+ sessionType +
+ ")");
+ }
+
+ try
+ {
+ synchronized (_sessions)
+ {
+ if (_sessions.isEmpty() == false)
+ {
+ throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
+ }
+
+ if (transacted)
+ {
+ acknowledgeMode = Session.SESSION_TRANSACTED;
+ }
+
+ QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(transacted,
+ acknowledgeMode,
+ sessionType);
+ info.setUserName(_userName);
+ info.setPassword(_password);
+ info.setClientID(_clientID);
+ info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Allocating session for " + this + " with request info=" + info);
+ }
+
+ QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
+
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Allocated " + this + " session=" + session);
+ }
+
+ session.setQpidSessionFactory(this);
+
+ if (_started)
+ {
+ session.start();
+ }
+
+ _sessions.add(session);
+
+ return session;
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ if (t instanceof Exception)
+ {
+ throw (Exception)t;
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected error: ", t);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _log.error("Could not create session", e);
+
+ JMSException je = new JMSException("Could not create a session: " + e.getMessage());
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ /**
+ * Check if we are closed
+ * @exception IllegalStateException Thrown if closed
+ */
+ protected void checkClosed() throws IllegalStateException
+ {
+ if (_closed)
+ {
+ throw new IllegalStateException("The connection is closed");
+ }
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org