You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC

svn commit: r1220336 [5/8] - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ jca/ jca/example/ jca/example/conf/ jca/example/src/ jca/example/src/main/ jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageProducer.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,419 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAMessageProducer.
+ *
+ */
+public class QpidRAMessageProducer implements MessageProducer
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageProducer.class);
+
+   /** The wrapped message producer */
+   protected MessageProducer _producer;
+
+   /** The session for this consumer */
+   protected QpidRASessionImpl _session;
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public QpidRAMessageProducer(final MessageProducer producer, final QpidRASessionImpl session)
+   {
+      this._producer = producer;
+      this._session = session;
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("new QpidRAMessageProducer " + this +
+                                            " producer=" +
+                                            Util.asString(producer) +
+                                            " session=" +
+                                            session);
+      }
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("close " + this);
+      }
+      try
+      {
+         closeProducer();
+      }
+      finally
+      {
+         _session.removeProducer(this);
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Destination destination,
+                    final Message message,
+                    final int deliveryMode,
+                    final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this +
+                                               " destination=" +
+                                               destination +
+                                               " message=" +
+                                               Util.asString(message) +
+                                               " deliveryMode=" +
+                                               deliveryMode +
+                                               " priority=" +
+                                               priority +
+                                               " ttl=" +
+                                               timeToLive);
+         }
+
+         checkState();
+
+         _producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Destination destination, final Message message) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+         }
+
+         checkState();
+
+         _producer.send(destination, message);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this +
+                                               " message=" +
+                                               Util.asString(message) +
+                                               " deliveryMode=" +
+                                               deliveryMode +
+                                               " priority=" +
+                                               priority +
+                                               " ttl=" +
+                                               timeToLive);
+         }
+
+         checkState();
+
+         _producer.send(message, deliveryMode, priority, timeToLive);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Message message) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this + " message=" + Util.asString(message));
+         }
+
+         checkState();
+
+         _producer.send(message);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Get the delivery mode
+    * @return The mode
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getDeliveryMode() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDeliveryMode()");
+      }
+
+      return _producer.getDeliveryMode();
+   }
+
+   /**
+    * Get the destination
+    * @return The destination
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Destination getDestination() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDestination()");
+      }
+
+      return _producer.getDestination();
+   }
+
+   /**
+    * Disable message id
+    * @return True if disable
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getDisableMessageID() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDisableMessageID()");
+      }
+
+      return _producer.getDisableMessageID();
+   }
+
+   /**
+    * Disable message timestamp
+    * @return True if disable
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getDisableMessageTimestamp() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDisableMessageTimestamp()");
+      }
+
+      return _producer.getDisableMessageTimestamp();
+   }
+
+   /**
+    * Get the priority
+    * @return The priority
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getPriority() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPriority()");
+      }
+
+      return _producer.getPriority();
+   }
+
+   /**
+    * Get the time to live
+    * @return The ttl
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getTimeToLive() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTimeToLive()");
+      }
+
+      return _producer.getTimeToLive();
+   }
+
+   /**
+    * Set the delivery mode
+    * @param deliveryMode The mode
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDeliveryMode(final int deliveryMode) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDeliveryMode(" + deliveryMode + ")");
+      }
+
+      _producer.setDeliveryMode(deliveryMode);
+   }
+
+   /**
+    * Set disable message id
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDisableMessageID(final boolean value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDisableMessageID(" + value + ")");
+      }
+
+      _producer.setDisableMessageID(value);
+   }
+
+   /**
+    * Set disable message timestamp
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDisableMessageTimestamp(final boolean value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDisableMessageTimestamp(" + value + ")");
+      }
+
+      _producer.setDisableMessageTimestamp(value);
+   }
+
+   /**
+    * Set the priority
+    * @param defaultPriority The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setPriority(final int defaultPriority) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPriority(" + defaultPriority + ")");
+      }
+
+      _producer.setPriority(defaultPriority);
+   }
+
+   /**
+    * Set the ttl
+    * @param timeToLive The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setTimeToLive(final long timeToLive) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTimeToLive(" + timeToLive + ")");
+      }
+
+      _producer.setTimeToLive(timeToLive);
+   }
+
+   /**
+    * Check state
+    * @exception JMSException Thrown if an error occurs
+    */
+   void checkState() throws JMSException
+   {
+      _session.checkState();
+   }
+
+   /**
+    * Close producer
+    * @exception JMSException Thrown if an error occurs
+    */
+   void closeProducer() throws JMSException
+   {
+      _producer.close();
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMetaData.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.resource.ResourceException;
+import javax.resource.spi.ManagedConnectionMetaData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Managed connection meta data
+ *
+ */
+public class QpidRAMetaData implements ManagedConnectionMetaData
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMetaData.class);
+
+   /** The managed connection */
+   private final QpidRAManagedConnection _mc;
+
+   /**
+    * Constructor
+    * @param mc The managed connection
+    */
+   public QpidRAMetaData(final QpidRAManagedConnection mc)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + mc + ")");
+      }
+
+      this._mc = mc;
+   }
+
+   /**
+    * Get the EIS product name
+    * @return The name
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getEISProductName() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getEISProductName()");
+      }
+
+      return "Qpid";
+   }
+
+   /**
+    * Get the EIS product version
+    * @return The version
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getEISProductVersion() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getEISProductVersion()");
+      }
+
+      return "2.0";
+   }
+
+   /**
+    * Get the user name
+    * @return The user name
+    * @exception ResourceException Thrown if operation fails
+    */
+   public String getUserName() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUserName()");
+      }
+
+      return _mc.getUserName();
+   }
+
+   /**
+     * Get the maximum number of connections -- RETURNS 0
+     * @return The number
+     * @exception ResourceException Thrown if operation fails
+     */
+   public int getMaxConnections() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMaxConnections()");
+      }
+
+      return 0;
+   }
+
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAObjectMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAObjectMessage extends QpidRAMessage implements ObjectMessage
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAObjectMessage.class);
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public QpidRAObjectMessage(final ObjectMessage message, final QpidRASessionImpl session)
+   {
+      super(message, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get the object
+    * @return The object
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Serializable getObject() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getObject()");
+      }
+
+      return ((ObjectMessage)_message).getObject();
+   }
+
+   /**
+    * Set the object
+    * @param object The object
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setObject(final Serializable object) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setObject(" + object + ")");
+      }
+
+      ((ObjectMessage)_message).setObject(object);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAProperties.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The RA default properties - these are set in the ra.xml file
+ *
+ */
+public class QpidRAProperties extends ConnectionFactoryProperties implements Serializable
+{
+   /** Serial version UID */
+   private static final long serialVersionUID = -4823893873707374791L;
+
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAProperties.class);
+
+   private static final int DEFAULT_SETUP_ATTEMPTS = 10;
+
+   private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000;
+
+   private int _setupAttempts = DEFAULT_SETUP_ATTEMPTS;
+
+   private long _setupInterval = DEFAULT_SETUP_INTERVAL;
+
+   /** Use Local TX instead of XA */
+   private Boolean _localTx = false;
+
+   /** Class used to locate the Transaction Manager. */
+   private String _transactionManagerLocatorClass ;
+
+   /** Method used to locate the TM */
+   private String _transactionManagerLocatorMethod ;
+
+
+   /**
+    * Constructor
+    */
+   public QpidRAProperties()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor()");
+      }
+   }
+
+   /**
+    * Get the use XA flag
+    * @return The value
+    */
+   public Boolean getUseLocalTx()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUseLocalTx()");
+      }
+
+      return _localTx;
+   }
+
+   /**
+    * Set the use XA flag
+    * @param localTx The value
+    */
+   public void setUseLocalTx(final Boolean localTx)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUseLocalTx(" + localTx + ")");
+      }
+
+      this._localTx = localTx;
+   }
+
+   public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionManagerLocatorClass(" + transactionManagerLocatorClass + ")");
+      }
+
+      this._transactionManagerLocatorClass = transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorClass()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionManagerLocatorClass()");
+      }
+
+      return _transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorMethod()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionManagerLocatorMethod()");
+      }
+
+      return _transactionManagerLocatorMethod;
+   }
+
+   public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionManagerLocatorMethod(" + transactionManagerLocatorMethod + ")");
+      }
+
+      this._transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+   }
+
+   public int getSetupAttempts()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupAttempts()");
+      }
+
+      return _setupAttempts;
+   }
+
+   public void setSetupAttempts(int setupAttempts)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupAttempts(" + setupAttempts + ")");
+      }
+
+      this._setupAttempts = setupAttempts;
+   }
+
+   public long getSetupInterval()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupInterval()");
+      }
+
+      return _setupInterval;
+   }
+
+   public void setSetupInterval(long setupInterval)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupInterval(" + setupInterval + ")");
+      }
+
+      this._setupInterval = setupInterval;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "QpidRAProperties[localTx=" + _localTx +
+            ", transactionManagerLocatorClass=" + _transactionManagerLocatorClass +
+            ", transactionManagerLocatorMethod=" + _transactionManagerLocatorMethod +
+            ", setupAttempts=" + _setupAttempts +
+            ", setupInterval=" + _setupInterval + "]";
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueBrowser.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAQueueBrowser.
+ *
+ */
+public class QpidRAQueueBrowser implements QueueBrowser
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueBrowser.class);
+
+   /** The wrapped queue browser */
+   protected QueueBrowser _browser;
+
+   /** The session for this consumer */
+   protected QpidRASessionImpl _session;
+
+   /** The closed flag */
+   private AtomicBoolean _isClosed = new AtomicBoolean();
+
+   /**
+    * Create a new wrapper
+    * @param browser the browser
+    * @param session the session
+    */
+   public QpidRAQueueBrowser(final QueueBrowser browser, final QpidRASessionImpl session)
+   {
+      _browser = browser;
+      _session = session;
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("new QpidRAQueueBrowser " + this +
+                                            " browser=" +
+                                            Util.asString(browser) +
+                                            " session=" +
+                                            session);
+      }
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("close " + this);
+      }
+
+      if (!_isClosed.getAndSet(true))
+      {
+         try
+         {
+            _browser.close();
+         }
+         finally
+         {
+            _session.removeQueueBrowser(this);
+         }
+      }
+   }
+
+   /**
+    * Get the queue associated with this browser.
+    * @return the associated queue.
+    */
+   public Queue getQueue()
+      throws JMSException
+   {
+      return _browser.getQueue();
+   }
+
+   /**
+    * Get the message selector associated with this browser.
+    * @return the associated message selector.
+    */
+   public String getMessageSelector()
+      throws JMSException
+   {
+      return _browser.getMessageSelector();
+   }
+
+   /**
+    * Get an enumeration for the current browser.
+    * @return The enumeration.
+    */
+   public Enumeration getEnumeration()
+      throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getEnumeration " + this + " browser=" + Util.asString(_browser));
+      }
+
+      _session.lock();
+      try
+      {
+         _session.checkState();
+         return _browser.getEnumeration();
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueReceiver.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a queue receiver
+ *
+ */
+public class QpidRAQueueReceiver extends QpidRAMessageConsumer implements QueueReceiver
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueReceiver.class);
+
+   /**
+    * Create a new wrapper
+    * @param consumer the queue receiver
+    * @param session the session
+    */
+   public QpidRAQueueReceiver(final QueueReceiver consumer, final QpidRASessionImpl session)
+   {
+      super(consumer, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(consumer) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get queue
+    * @return The queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Queue getQueue() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getQueue()");
+      }
+
+      checkState();
+      return ((QueueReceiver)_consumer).getQueue();
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAQueueSender.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAQueueSender.
+ *
+ */
+public class QpidRAQueueSender extends QpidRAMessageProducer implements QueueSender
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAQueueSender.class);
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public QpidRAQueueSender(final QueueSender producer, final QpidRASessionImpl session)
+   {
+      super(producer, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(producer) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get queue
+    * @return The queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Queue getQueue() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getQueue()");
+      }
+
+      return ((QueueSender)_producer).getQueue();
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Queue destination,
+                    final Message message,
+                    final int deliveryMode,
+                    final int priority,
+                    final long timeToLive) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this +
+                                           " destination=" +
+                                           destination +
+                                           " message=" +
+                                           Util.asString(message) +
+                                           " deliveryMode=" +
+                                           deliveryMode +
+                                           " priority=" +
+                                           priority +
+                                           " ttl=" +
+                                           timeToLive);
+         }
+
+         checkState();
+         _producer.send(destination, message, deliveryMode, priority, timeToLive);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Send message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void send(final Queue destination, final Message message) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+         }
+
+         checkState();
+         _producer.send(destination, message);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASession.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+
+public interface QpidRASession
+{
+    public void setQpidSessionFactory(QpidRASessionFactory sf);
+
+    public void start() throws JMSException;
+
+    public void close() throws JMSException;
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XATopicConnection;
+
+/**
+ * A joint interface for all connection types
+ *
+ */
+public interface QpidRASessionFactory extends Connection, TopicConnection, QueueConnection, XAConnection,
+         XATopicConnection, XAQueueConnection
+{
+   /** Error message for strict behaviour */
+   String ISE = "This method is not applicable inside the application server. See the J2EE spec, e.g. J2EE1.4 Section 6.6";
+
+   /**
+    * Add a temporary queue
+    * @param temp The temporary queue
+    */
+   void addTemporaryQueue(TemporaryQueue temp);
+
+   /**
+    * Add a temporary topic
+    * @param temp The temporary topic
+    */
+   void addTemporaryTopic(TemporaryTopic temp);
+
+   /**
+    * Notification that a session is closed
+    * @param session The session
+    * @throws JMSException for any error
+    */
+   void closeSession(QpidRASessionImpl session) throws JMSException;
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionFactoryImpl.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,911 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.naming.Reference;
+import javax.resource.Referenceable;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the JMS Connection API and produces {@link QpidRASessionImpl} objects.
+ *
+ */
+public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Referenceable
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRASessionFactoryImpl.class);
+
+   /** Are we closed? */
+   private boolean _closed = false;
+
+   /** The naming reference */
+   private Reference _reference;
+
+   /** The user name */
+   private String _userName;
+
+   /** The password */
+   private String _password;
+
+   /** The client ID */
+   private String _clientID;
+
+   /** The connection type */
+   private final int _type;
+
+   /** Whether we are started */
+   private boolean _started = false;
+
+   /** The managed connection factory */
+   private final QpidRAManagedConnectionFactory _mcf;
+
+   /** The connection manager */
+   private ConnectionManager _cm;
+
+   /** The sessions */
+   private final Set<QpidRASession> _sessions = new HashSet<QpidRASession>();
+
+   /** The temporary queues */
+   private final Set<TemporaryQueue> _tempQueues = new HashSet<TemporaryQueue>();
+
+   /** The temporary topics */
+   private final Set<TemporaryTopic> _tempTopics = new HashSet<TemporaryTopic>();
+
+   /**
+    * Constructor
+    * @param mcf The managed connection factory
+    * @param cm The connection manager
+    * @param type The connection type
+    */
+   public QpidRASessionFactoryImpl(final QpidRAManagedConnectionFactory mcf,
+                                      final ConnectionManager cm,
+                                      final int type)
+   {
+      this._mcf = mcf;
+
+      if (cm == null)
+      {
+         this._cm = new QpidRAConnectionManager();
+      }
+      else
+      {
+         this._cm = cm;
+      }
+
+      this._type = type;
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + mcf + ", " + cm + ", " + type);
+      }
+   }
+
+   /**
+    * Set the naming reference
+    * @param reference The reference
+    */
+   public void setReference(final Reference reference)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setReference(" + reference + ")");
+      }
+
+      this._reference = reference;
+   }
+
+   /**
+    * Get the naming reference
+    * @return The reference
+    */
+   public Reference getReference()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getReference()");
+      }
+
+      return _reference;
+   }
+
+   /**
+    * Set the user name
+    * @param name The user name
+    */
+   public void setUserName(final String name)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUserName(" + name + ")");
+      }
+
+      _userName = name;
+   }
+
+   /**
+    * Set the password
+    * @param password The password
+    */
+   public void setPassword(final String password)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPassword(****)");
+      }
+
+      this._password = password;
+   }
+
+   /**
+    * Get the client ID
+    * @return The client ID
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getClientID() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getClientID()");
+      }
+
+      checkClosed();
+
+      if (_clientID == null)
+      {
+         try
+         {
+            return _mcf.getDefaultAMQConnectionFactory().getConnectionURL().getClientName() ;
+         }
+         catch (final ResourceException re)
+         {
+            final JMSException jmse = new JMSException("Unexpected exception obtaining resource") ;
+            jmse.initCause(re) ;
+            throw jmse ;
+         }
+      }
+
+      return _clientID;
+   }
+
+   /**
+    * Set the client ID -- throws IllegalStateException
+    * @param cID The client ID
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setClientID(final String cID) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setClientID(" + cID + ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a queue session
+    * @param transacted Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The queue session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createQueueSession(" + transacted + ", " + acknowledgeMode + ")");
+      }
+
+      checkClosed();
+
+      if (_type == QpidRAConnectionFactory.TOPIC_CONNECTION || _type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a queue session from a topic connection");
+      }
+
+      return (QueueSession)allocateConnection(transacted, acknowledgeMode, _type);
+   }
+
+   /**
+    * Create a XA queue session
+    * @return The XA queue session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public XAQueueSession createXAQueueSession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createXAQueueSession()");
+      }
+
+      checkClosed();
+
+      if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.TOPIC_CONNECTION ||
+          _type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a queue connection");
+      }
+
+      return (XAQueueSession) allocateConnection(_type);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    * @param queue The queue
+    * @param messageSelector The message selector
+    * @param sessionPool The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Queue queue,
+                                                      final String messageSelector,
+                                                      final ServerSessionPool sessionPool,
+                                                      final int maxMessages) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionConsumer(" + queue +
+                                               ", " +
+                                               messageSelector +
+                                               ", " +
+                                               sessionPool +
+                                               ", " +
+                                               maxMessages +
+                                               ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a topic session
+    * @param transacted Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The topic session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createTopicSession(" + transacted + ", " + acknowledgeMode + ")");
+      }
+
+      checkClosed();
+
+      if (_type == QpidRAConnectionFactory.QUEUE_CONNECTION || _type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a queue connection");
+      }
+
+      return (TopicSession) allocateConnection(transacted, acknowledgeMode, _type);
+   }
+
+   /**
+    * Create a XA topic session
+    * @return The XA topic session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public XATopicSession createXATopicSession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createXATopicSession()");
+      }
+
+      checkClosed();
+
+      if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+          _type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a queue connection");
+      }
+
+      return (XATopicSession) allocateConnection(_type);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    * @param topic The topic
+    * @param messageSelector The message selector
+    * @param sessionPool The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Topic topic,
+                                                      final String messageSelector,
+                                                      final ServerSessionPool sessionPool,
+                                                      final int maxMessages) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionConsumer(" + topic +
+                                               ", " +
+                                               messageSelector +
+                                               ", " +
+                                               sessionPool +
+                                               ", " +
+                                               maxMessages +
+                                               ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a durable connection consumer -- throws IllegalStateException
+    * @param topic The topic
+    * @param subscriptionName The subscription name
+    * @param messageSelector The message selector
+    * @param sessionPool The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
+                                                             final String subscriptionName,
+                                                             final String messageSelector,
+                                                             final ServerSessionPool sessionPool,
+                                                             final int maxMessages) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionConsumer(" + topic +
+                                               ", " +
+                                               subscriptionName +
+                                               ", " +
+                                               messageSelector +
+                                               ", " +
+                                               sessionPool +
+                                               ", " +
+                                               maxMessages +
+                                               ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    * @param destination The destination
+    * @param pool The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Destination destination,
+                                                      final ServerSessionPool pool,
+                                                      final int maxMessages) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionConsumer(" + destination +
+                                               ", " +
+                                               pool +
+                                               ", " +
+                                               maxMessages +
+                                               ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    * @param destination The destination
+    * @param name The name
+    * @param pool The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Destination destination,
+                                                      final String name,
+                                                      final ServerSessionPool pool,
+                                                      final int maxMessages) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionConsumer(" + destination +
+                                               ", " +
+                                               name +
+                                               ", " +
+                                               pool +
+                                               ", " +
+                                               maxMessages +
+                                               ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a session
+    * @param transacted Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createSession(" + transacted + ", " + acknowledgeMode + ")");
+      }
+
+      checkClosed();
+      return (Session) allocateConnection(transacted, acknowledgeMode, _type);
+   }
+
+   /**
+    * Create a XA session
+    * @return The XA session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public XASession createXASession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createXASession()");
+      }
+
+      checkClosed();
+      return (XASession) allocateConnection(_type);
+   }
+
+   /**
+    * Get the connection metadata
+    * @return The connection metadata
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ConnectionMetaData getMetaData() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMetaData()");
+      }
+
+      checkClosed();
+      return _mcf.getMetaData();
+   }
+
+   /**
+    * Get the exception listener -- throws IllegalStateException
+    * @return The exception listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ExceptionListener getExceptionListener() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getExceptionListener()");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Set the exception listener -- throws IllegalStateException
+    * @param listener The exception listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setExceptionListener(final ExceptionListener listener) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setExceptionListener(" + listener + ")");
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Start
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void start() throws JMSException
+   {
+      checkClosed();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start() " + this);
+      }
+
+      synchronized (_sessions)
+      {
+         if (_started)
+         {
+            return;
+         }
+         _started = true;
+         for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
+         {
+            QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+            session.start();
+         }
+      }
+   }
+
+   /**
+    * Stop -- throws IllegalStateException
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void stop() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("stop() " + this);
+      }
+
+      throw new IllegalStateException(QpidRASessionFactory.ISE);
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("close() " + this);
+      }
+
+      if (_closed)
+      {
+         return;
+      }
+
+      _closed = true;
+
+      synchronized (_sessions)
+      {
+         for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
+         {
+            QpidRASessionImpl session = (QpidRASessionImpl)i.next();
+            try
+            {
+               session.closeSession();
+            }
+            catch (Throwable t)
+            {
+               _log.trace("Error closing session", t);
+            }
+            i.remove();
+         }
+      }
+
+      synchronized (_tempQueues)
+      {
+         for (Iterator<TemporaryQueue> i = _tempQueues.iterator(); i.hasNext();)
+         {
+            TemporaryQueue temp = (TemporaryQueue)i.next();
+            try
+            {
+               if (_log.isTraceEnabled())
+               {
+                  _log.trace("Closing temporary queue " + temp + " for " + this);
+               }
+               temp.delete();
+            }
+            catch (Throwable t)
+            {
+               _log.trace("Error deleting temporary queue", t);
+            }
+            i.remove();
+         }
+      }
+
+      synchronized (_tempTopics)
+      {
+         for (Iterator<TemporaryTopic> i = _tempTopics.iterator(); i.hasNext();)
+         {
+            TemporaryTopic temp = (TemporaryTopic)i.next();
+            try
+            {
+               if (_log.isTraceEnabled())
+               {
+                  _log.trace("Closing temporary topic " + temp + " for " + this);
+               }
+               temp.delete();
+            }
+            catch (Throwable t)
+            {
+               _log.trace("Error deleting temporary queue", t);
+            }
+            i.remove();
+         }
+      }
+   }
+
+   /**
+    * Close session
+    * @param session The session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void closeSession(final QpidRASessionImpl session) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("closeSession(" + session + ")");
+      }
+
+      synchronized (_sessions)
+      {
+         _sessions.remove(session);
+      }
+   }
+
+   /**
+    * Add temporary queue
+    * @param temp The temporary queue
+    */
+   public void addTemporaryQueue(final TemporaryQueue temp)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addTemporaryQueue(" + temp + ")");
+      }
+
+      synchronized (_tempQueues)
+      {
+         _tempQueues.add(temp);
+      }
+   }
+
+   /**
+    * Add temporary topic
+    * @param temp The temporary topic
+    */
+   public void addTemporaryTopic(final TemporaryTopic temp)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addTemporaryTopic(" + temp + ")");
+      }
+
+      synchronized (_tempTopics)
+      {
+         _tempTopics.add(temp);
+      }
+   }
+
+   /**
+    * Allocation a connection
+    * @param sessionType The session type
+    * @return The session
+    * @exception JMSException Thrown if an error occurs
+    */
+   protected QpidRASession allocateConnection(final int sessionType) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("allocateConnection(" + sessionType + ")");
+      }
+
+      try
+      {
+         synchronized (_sessions)
+         {
+            if (_sessions.isEmpty() == false)
+            {
+               throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
+            }
+
+            QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(sessionType);
+            info.setUserName(_userName);
+            info.setPassword(_password);
+            info.setClientID(_clientID);
+            info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
+
+            if (_log.isTraceEnabled())
+            {
+               _log.trace("Allocating session for " + this + " with request info=" + info);
+            }
+
+            QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
+
+            try
+            {
+               if (_log.isTraceEnabled())
+               {
+                  _log.trace("Allocated  " + this + " session=" + session);
+               }
+
+               session.setQpidSessionFactory(this);
+
+               if (_started)
+               {
+                  session.start();
+               }
+
+               _sessions.add(session);
+
+               return session;
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+               if (t instanceof Exception)
+               {
+                  throw (Exception)t;
+               }
+               else
+               {
+                  throw new RuntimeException("Unexpected error: ", t);
+               }
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         _log.error("Could not create session", e);
+
+         JMSException je = new JMSException("Could not create a session: " + e.getMessage());
+         je.setLinkedException(e);
+         throw je;
+      }
+   }
+
+   /**
+    * Allocation a connection
+    * @param transacted Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @param sessionType The session type
+    * @return The session
+    * @exception JMSException Thrown if an error occurs
+    */
+   protected QpidRASession allocateConnection(final boolean transacted, int acknowledgeMode, final int sessionType) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("allocateConnection(" + transacted +
+                                               ", " +
+                                               acknowledgeMode +
+                                               ", " +
+                                               sessionType +
+                                               ")");
+      }
+
+      try
+      {
+         synchronized (_sessions)
+         {
+            if (_sessions.isEmpty() == false)
+            {
+               throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
+            }
+
+            if (transacted)
+            {
+               acknowledgeMode = Session.SESSION_TRANSACTED;
+            }
+
+            QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(transacted,
+                                                                                     acknowledgeMode,
+                                                                                     sessionType);
+            info.setUserName(_userName);
+            info.setPassword(_password);
+            info.setClientID(_clientID);
+            info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
+
+            if (_log.isTraceEnabled())
+            {
+               _log.trace("Allocating session for " + this + " with request info=" + info);
+            }
+
+            QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
+
+            try
+            {
+               if (_log.isTraceEnabled())
+               {
+                  _log.trace("Allocated  " + this + " session=" + session);
+               }
+
+               session.setQpidSessionFactory(this);
+
+               if (_started)
+               {
+                  session.start();
+               }
+
+               _sessions.add(session);
+
+               return session;
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+               if (t instanceof Exception)
+               {
+                  throw (Exception)t;
+               }
+               else
+               {
+                  throw new RuntimeException("Unexpected error: ", t);
+               }
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         _log.error("Could not create session", e);
+
+         JMSException je = new JMSException("Could not create a session: " + e.getMessage());
+         je.setLinkedException(e);
+         throw je;
+      }
+   }
+
+   /**
+    * Check if we are closed
+    * @exception IllegalStateException Thrown if closed
+    */
+   protected void checkClosed() throws IllegalStateException
+   {
+      if (_closed)
+      {
+         throw new IllegalStateException("The connection is closed");
+      }
+   }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org