You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC

svn commit: r1220336 [6/8] - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ jca/ jca/example/ jca/example/conf/ jca/example/src/ jca/example/src/main/ jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,1732 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ManagedConnection;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A joint interface for JMS sessions
+ *
+ */
+public class QpidRASessionImpl implements Session, QueueSession, TopicSession, XASession, XAQueueSession, XATopicSession, QpidRASession
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRASessionImpl.class);
+
+   /** The managed connection */
+   private volatile QpidRAManagedConnection _mc;
+   /** The locked managed connection */
+   private QpidRAManagedConnection _lockedMC;
+
+   /** The connection request info */
+   private final QpidRAConnectionRequestInfo _cri;
+
+   /** The session factory */
+   private QpidRASessionFactory _sf;
+
+   /** The message consumers */
+   private final Set<MessageConsumer> _consumers;
+
+   /** The message producers */
+   private final Set<MessageProducer> _producers;
+
+   /** The queue browsers */
+   private final Set<QueueBrowser> _browsers;
+
+   /** Are we started */
+   private AtomicBoolean _started = new AtomicBoolean(false) ;
+
+   /**
+    * Constructor
+    * @param mc The managed connection
+    * @param cri The connection request info
+    */
+   public QpidRASessionImpl(final QpidRAManagedConnection mc, final QpidRAConnectionRequestInfo cri)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + mc + ", " + cri + ")");
+      }
+
+      this._mc = mc;
+      this._cri = cri;
+      _sf = null;
+      _consumers = new HashSet<MessageConsumer>();
+      _producers = new HashSet<MessageProducer>();
+      _browsers = new HashSet<QueueBrowser>();
+   }
+
+   /**
+    * Set the session factory
+    * @param sf The session factory
+    */
+   public void setQpidSessionFactory(final QpidRASessionFactory sf)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setQpidSessionFactory(" + sf + ")");
+      }
+
+      _started.set(false) ;
+      this._sf = sf;
+   }
+
+   /**
+    * Lock
+    * @exception JMSException Thrown if an error occurs
+    * @exception IllegalStateException The session is closed
+    */
+   protected void lock() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("lock()");
+      }
+
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         mc.tryLock();
+         _lockedMC = mc ;
+      }
+      else
+      {
+         throw new IllegalStateException("Connection is not associated with a managed connection. " + this);
+      }
+   }
+
+   /**
+    * Unlock
+    */
+   protected void unlock()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("unlock()");
+      }
+
+      if (_lockedMC != null)
+      {
+         try
+         {
+            _lockedMC.unlock();
+         }
+         finally
+         {
+            _lockedMC = null ;
+         }
+      }
+
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+
+   /**
+    * Create a bytes message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public BytesMessage createBytesMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createBytesMessage" + Util.asString(session));
+      }
+
+      return session.createBytesMessage();
+   }
+
+   /**
+    * Create a map message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MapMessage createMapMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createMapMessage" + Util.asString(session));
+      }
+
+      return session.createMapMessage();
+   }
+
+   /**
+    * Create a message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message createMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createMessage" + Util.asString(session));
+      }
+
+      return session.createMessage();
+   }
+
+   /**
+    * Create an object message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ObjectMessage createObjectMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createObjectMessage" + Util.asString(session));
+      }
+
+      return session.createObjectMessage();
+   }
+
+   /**
+    * Create an object message
+    * @param object The object
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createObjectMessage(" + object + ")" + Util.asString(session));
+      }
+
+      return session.createObjectMessage(object);
+   }
+
+   /**
+    * Create a stream message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public StreamMessage createStreamMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createStreamMessage" + Util.asString(session));
+      }
+
+      return session.createStreamMessage();
+   }
+
+   /**
+    * Create a text message
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TextMessage createTextMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createTextMessage" + Util.asString(session));
+      }
+
+      return session.createTextMessage();
+   }
+
+   /**
+    * Create a text message
+    * @param string The text
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TextMessage createTextMessage(final String string) throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createTextMessage(" + string + ")" + Util.asString(session));
+      }
+
+      return session.createTextMessage(string);
+   }
+
+   /**
+    * Get transacted
+    * @return True if transacted; otherwise false
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getTransacted() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransacted()");
+      }
+
+      getSessionInternal();
+      return _cri.isTransacted();
+   }
+
+   /**
+    * Get the message listener -- throws IllegalStateException
+    * @return The message listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageListener getMessageListener() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageListener()");
+      }
+
+      throw new IllegalStateException("Method not allowed");
+   }
+
+   /**
+    * Set the message listener -- Throws IllegalStateException
+    * @param listener The message listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setMessageListener(final MessageListener listener) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setMessageListener(" + listener + ")");
+      }
+
+      throw new IllegalStateException("Method not allowed");
+   }
+
+   /**
+    * Always throws an Error.
+    * @exception Error Method not allowed.
+    */
+   public void run()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("run()");
+      }
+
+      throw new Error("Method not allowed");
+   }
+
+   /**
+    * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
+    * managed connection.
+    * @exception JMSException Failed to close session.
+    */
+   public void close() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("close()");
+      }
+
+      _sf.closeSession(this);
+      closeSession();
+   }
+
+   /**
+    * Commit
+    * @exception JMSException Failed to close session.
+    */
+   public void commit() throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new TransactionInProgressException("XA connection");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_cri.isTransacted() == false)
+         {
+            throw new IllegalStateException("Session is not transacted");
+         }
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("Commit session " + this);
+         }
+
+         session.commit();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Rollback
+    * @exception JMSException Failed to close session.
+    */
+   public void rollback() throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new TransactionInProgressException("XA connection");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_cri.isTransacted() == false)
+         {
+            throw new IllegalStateException("Session is not transacted");
+         }
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("Rollback session " + this);
+         }
+
+         session.rollback();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Recover
+    * @exception JMSException Failed to close session.
+    */
+   public void recover() throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_cri.isTransacted())
+         {
+            throw new IllegalStateException("Session is transacted");
+         }
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("Recover session " + this);
+         }
+
+         session.recover();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic
+    * @param topicName The topic name
+    * @return The topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Topic createTopic(final String topicName) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createTopic " + Util.asString(session) + " topicName=" + topicName);
+      }
+
+      Topic result = session.createTopic(topicName);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createdTopic " + Util.asString(session) + " topic=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a topic subscriber
+    * @param topic The topic
+    * @return The subscriber
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createSubscriber " + Util.asString(session) + " topic=" + topic);
+         }
+
+         TopicSubscriber result = session.createSubscriber(topic);
+         result = new QpidRATopicSubscriber(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic subscriber
+    * @param topic The topic
+    * @param messageSelector The message selector
+    * @param noLocal If true inhibits the delivery of messages published by its own connection
+    * @return The subscriber
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createSubscriber " + Util.asString(session) +
+                                       " topic=" +
+                                       topic +
+                                       " selector=" +
+                                       messageSelector +
+                                       " noLocal=" +
+                                       noLocal);
+         }
+
+         TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+         result = new QpidRATopicSubscriber(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a durable topic subscriber
+    * @param topic The topic
+    * @param name The name
+    * @return The subscriber
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createDurableSubscriber " + Util.asString(session) + " topic=" + topic + " name=" + name);
+         }
+
+         TopicSubscriber result = session.createDurableSubscriber(topic, name);
+         result = new QpidRATopicSubscriber(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic subscriber
+    * @param topic The topic
+    * @param name The name
+    * @param messageSelector The message selector
+    * @param noLocal If true inhibits the delivery of messages published by its own connection
+    * @return The subscriber
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createDurableSubscriber(final Topic topic,
+                                                  final String name,
+                                                  final String messageSelector,
+                                                  final boolean noLocal) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createDurableSubscriber " + Util.asString(session) +
+                                       " topic=" +
+                                       topic +
+                                       " name=" +
+                                       name +
+                                       " selector=" +
+                                       messageSelector +
+                                       " noLocal=" +
+                                       noLocal);
+         }
+
+         TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+         result = new QpidRATopicSubscriber(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic publisher
+    * @param topic The topic
+    * @return The publisher
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicPublisher createPublisher(final Topic topic) throws JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createPublisher " + Util.asString(session) + " topic=" + topic);
+         }
+
+         TopicPublisher result = session.createPublisher(topic);
+         result = new QpidRATopicPublisher(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdPublisher " + Util.asString(session) + " publisher=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a temporary topic
+    * @return The temporary topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TemporaryTopic createTemporaryTopic() throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createTemporaryTopic " + Util.asString(session));
+         }
+
+         TemporaryTopic temp = session.createTemporaryTopic();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdTemporaryTopic " + Util.asString(session) + " temp=" + temp);
+         }
+
+         _sf.addTemporaryTopic(temp);
+
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Unsubscribe
+    * @param name The name
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void unsubscribe(final String name) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("unsubscribe " + Util.asString(session) + " name=" + name);
+         }
+
+         session.unsubscribe(name);
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a browser
+    * @param queue The queue
+    * @return The browser
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueBrowser createBrowser(final Queue queue) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue);
+      }
+
+      QueueBrowser result = session.createBrowser(queue);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a browser
+    * @param queue The queue
+    * @param messageSelector The message selector
+    * @return The browser
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+      }
+
+      QueueBrowser result = session.createBrowser(queue, messageSelector);
+      result = new QpidRAQueueBrowser(result, this);
+      addQueueBrowser(result) ;
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a queue
+    * @param queueName The queue name
+    * @return The queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Queue createQueue(final String queueName) throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createQueue " + Util.asString(session) + " queueName=" + queueName);
+      }
+
+      Queue result = session.createQueue(queueName);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createdQueue " + Util.asString(session) + " queue=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a queue receiver
+    * @param queue The queue
+    * @return The queue receiver
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueReceiver createReceiver(final Queue queue) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue);
+         }
+
+         QueueReceiver result = session.createReceiver(queue);
+         result = new QpidRAQueueReceiver(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a queue receiver
+    * @param queue The queue
+    * @param messageSelector
+    * @return The queue receiver
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+         }
+
+         QueueReceiver result = session.createReceiver(queue, messageSelector);
+         result = new QpidRAQueueReceiver(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a queue sender
+    * @param queue The queue
+    * @return The queue sender
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueSender createSender(final Queue queue) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createSender " + Util.asString(session) + " queue=" + queue);
+         }
+
+         QueueSender result = session.createSender(queue);
+         result = new QpidRAQueueSender(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdSender " + Util.asString(session) + " sender=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a temporary queue
+    * @return The temporary queue
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TemporaryQueue createTemporaryQueue() throws JMSException
+   {
+      if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createTemporaryQueue " + Util.asString(session));
+         }
+
+         TemporaryQueue temp = session.createTemporaryQueue();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdTemporaryQueue " + Util.asString(session) + " temp=" + temp);
+         }
+
+         _sf.addTemporaryQueue(temp);
+
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    * @param destination The destination
+    * @return The message consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createConsumer " + Util.asString(session) + " dest=" + destination);
+         }
+
+         MessageConsumer result = session.createConsumer(destination);
+         result = new QpidRAMessageConsumer(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    * @param destination The destination
+    * @param messageSelector The message selector
+    * @return The message consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createConsumer " + Util.asString(session) +
+                                       " dest=" +
+                                       destination +
+                                       " messageSelector=" +
+                                       messageSelector);
+         }
+
+         MessageConsumer result = session.createConsumer(destination, messageSelector);
+         result = new QpidRAMessageConsumer(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    * @param destination The destination
+    * @param messageSelector The message selector
+    * @param noLocal If true inhibits the delivery of messages published by its own connection
+    * @return The message consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination,
+                                         final String messageSelector,
+                                         final boolean noLocal) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createConsumer " + Util.asString(session) +
+                                       " dest=" +
+                                       destination +
+                                       " messageSelector=" +
+                                       messageSelector +
+                                       " noLocal=" +
+                                       noLocal);
+         }
+
+         MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+         result = new QpidRAMessageConsumer(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message producer
+    * @param destination The destination
+    * @return The message producer
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageProducer createProducer(final Destination destination) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createProducer " + Util.asString(session) + " dest=" + destination);
+         }
+
+         MessageProducer result = session.createProducer(destination);
+         result = new QpidRAMessageProducer(result, this);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("createdProducer " + Util.asString(session) + " producer=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the acknowledge mode
+    * @return The mode
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getAcknowledgeMode() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getAcknowledgeMode()");
+      }
+
+      getSessionInternal();
+      return _cri.getAcknowledgeMode();
+   }
+
+   /**
+    * Get the XA resource
+    * @return The XA resource
+    * @exception IllegalStateException If non XA connection
+    */
+   public XAResource getXAResource()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getXAResource()");
+      }
+
+      if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         return null;
+      }
+
+      try
+      {
+         lock();
+
+         return getXAResourceInternal();
+      }
+      catch (Throwable t)
+      {
+         return null;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the session
+    * @return The session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Session getSession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSession()");
+      }
+
+      if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the queue session
+    * @return The queue session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public QueueSession getQueueSession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getQueueSession()");
+      }
+
+      if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the topic session
+    * @return The topic session
+    * @exception JMSException Thrown if an error occurs
+    */
+   public TopicSession getTopicSession() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTopicSession()");
+      }
+
+      if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+          _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Set the managed connection
+    * @param managedConnection The managed connection
+    */
+   void setManagedConnection(final QpidRAManagedConnection managedConnection)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setManagedConnection(" + managedConnection + ")");
+      }
+
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         mc.removeHandle(this);
+      }
+
+      this._mc = managedConnection;
+   }
+
+   /** for tests only */
+   public ManagedConnection getManagedConnection()
+   {
+      return _mc;
+   }
+
+   /**
+    * Destroy
+    */
+   void destroy()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("destroy()");
+      }
+
+      _mc = null;
+   }
+
+   /**
+    * Start
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void start() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start()");
+      }
+
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         _started.set(true) ;
+         mc.start();
+      }
+   }
+
+   /**
+    * Stop
+    * @exception JMSException Thrown if an error occurs
+    */
+   void stop() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("stop()");
+      }
+
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         mc.stop();
+         _started.set(false) ;
+      }
+   }
+
+   /**
+    * Check strict
+    * @exception JMSException Thrown if an error occurs
+    */
+   void checkStrict() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("checkStrict()");
+      }
+
+      if (_mc != null)
+      {
+         throw new IllegalStateException(QpidRASessionFactory.ISE);
+      }
+   }
+
+   /**
+    * Close session
+    * @exception JMSException Thrown if an error occurs
+    */
+   void closeSession() throws JMSException
+   {
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         _log.trace("Closing session");
+
+         try
+         {
+            mc.stop();
+         }
+         catch (Throwable t)
+         {
+            _log.trace("Error stopping managed connection", t);
+         }
+
+         synchronized (_consumers)
+         {
+            for (Iterator<MessageConsumer> i = _consumers.iterator(); i.hasNext();)
+            {
+               QpidRAMessageConsumer consumer = (QpidRAMessageConsumer)i.next();
+               try
+               {
+                  consumer.closeConsumer();
+               }
+               catch (Throwable t)
+               {
+                  _log.trace("Error closing consumer", t);
+               }
+               i.remove();
+            }
+         }
+
+         synchronized (_producers)
+         {
+            for (Iterator<MessageProducer> i = _producers.iterator(); i.hasNext();)
+            {
+               QpidRAMessageProducer producer = (QpidRAMessageProducer)i.next();
+               try
+               {
+                  producer.closeProducer();
+               }
+               catch (Throwable t)
+               {
+                  _log.trace("Error closing producer", t);
+               }
+               i.remove();
+            }
+         }
+
+         synchronized (_browsers)
+         {
+            for (Iterator<QueueBrowser> i = _browsers.iterator(); i.hasNext();)
+            {
+               QpidRAQueueBrowser browser = (QpidRAQueueBrowser)i.next();
+               try
+               {
+                  browser.close();
+               }
+               catch (Throwable t)
+               {
+                  _log.trace("Error closing browser", t);
+               }
+               i.remove();
+            }
+         }
+
+         mc.removeHandle(this);
+         ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED);
+         ev.setConnectionHandle(this);
+         mc.sendEvent(ev);
+         this._mc = null;
+      }
+   }
+
+   /**
+    * Add consumer
+    * @param consumer The consumer
+    */
+   void addConsumer(final MessageConsumer consumer)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addConsumer(" + consumer + ")");
+      }
+
+      synchronized (_consumers)
+      {
+         _consumers.add(consumer);
+      }
+   }
+
+   /**
+    * Remove consumer
+    * @param consumer The consumer
+    */
+   void removeConsumer(final MessageConsumer consumer)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("removeConsumer(" + consumer + ")");
+      }
+
+      synchronized (_consumers)
+      {
+         _consumers.remove(consumer);
+      }
+   }
+
+   /**
+    * Add producer
+    * @param producer The producer
+    */
+   void addProducer(final MessageProducer producer)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addProducer(" + producer + ")");
+      }
+
+      synchronized (_producers)
+      {
+         _producers.add(producer);
+      }
+   }
+
+   /**
+    * Remove producer
+    * @param producer The producer
+    */
+   void removeProducer(final MessageProducer producer)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("removeProducer(" + producer + ")");
+      }
+
+      synchronized (_producers)
+      {
+         _producers.remove(producer);
+      }
+   }
+
+   /**
+    * Add queue browser
+    * @param browser The queue browser
+    */
+   void addQueueBrowser(final QueueBrowser browser)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addQueueBrowser(" + browser + ")");
+      }
+
+      synchronized (_browsers)
+      {
+         _browsers.add(browser);
+      }
+   }
+
+   /**
+    * Remove queue browser
+    * @param browser The queue browser
+    */
+   void removeQueueBrowser(final QueueBrowser browser)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("removeQueueBrowser(" + browser + ")");
+      }
+
+      synchronized (_browsers)
+      {
+         _browsers.remove(browser);
+      }
+   }
+
+   /**
+    * Get the session and ensure that it is open
+    * @return The session
+    * @exception JMSException Thrown if an error occurs
+    * @exception IllegalStateException The session is closed
+    */
+   Session getSessionInternal() throws JMSException
+   {
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc == null)
+      {
+         throw new IllegalStateException("The session is closed");
+      }
+
+      Session session = mc.getSession();
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSessionInternal " + Util.asString(session) + " for " + this);
+      }
+
+      return session;
+   }
+
+   /**
+    * Get the XA resource and ensure that it is open
+    * @return The XA Resource
+    * @exception JMSException Thrown if an error occurs
+    * @exception IllegalStateException The session is closed
+    */
+   XAResource getXAResourceInternal() throws JMSException
+   {
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc == null)
+      {
+         throw new IllegalStateException("The session is closed");
+      }
+
+      try
+      {
+         XAResource xares = mc.getXAResource();
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("getXAResourceInternal " + xares + " for " + this);
+         }
+
+         return xares;
+      }
+      catch (ResourceException e)
+      {
+         JMSException jmse = new JMSException("Unable to get XA Resource");
+         jmse.initCause(e);
+         throw jmse;
+      }
+   }
+
+   /**
+    * Get the queue session
+    * @return The queue session
+    * @exception JMSException Thrown if an error occurs
+    * @exception IllegalStateException The session is closed
+    */
+   QueueSession getQueueSessionInternal() throws JMSException
+   {
+      Session s = getSessionInternal();
+      if (!(s instanceof QueueSession))
+      {
+         throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this);
+      }
+      return (QueueSession)s;
+   }
+
+   /**
+    * Get the topic session
+    * @return The topic session
+    * @exception JMSException Thrown if an error occurs
+    * @exception IllegalStateException The session is closed
+    */
+   TopicSession getTopicSessionInternal() throws JMSException
+   {
+      Session s = getSessionInternal();
+      if (!(s instanceof TopicSession))
+      {
+         throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this);
+      }
+      return (TopicSession)s;
+   }
+
+   /**
+    * @throws SystemException
+    * @throws RollbackException
+    *
+    */
+   public void checkState() throws JMSException
+   {
+      final QpidRAManagedConnection mc = this._mc;
+      if (mc != null)
+      {
+         mc.checkTransactionActive();
+      }
+   }
+
+   /**
+    * Has this session been started?
+    * @return true if started, false if stopped.
+    */
+   public boolean isStarted()
+   {
+      return _started.get();
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,415 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAStreamMessage extends QpidRAMessage implements StreamMessage
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAStreamMessage.class);
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public QpidRAStreamMessage(final StreamMessage message, final QpidRASessionImpl session)
+   {
+      super(message, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean readBoolean() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readBoolean()");
+      }
+
+      return ((StreamMessage)_message).readBoolean();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte readByte() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readByte()");
+      }
+
+      return ((StreamMessage)_message).readByte();
+   }
+
+   /**
+    * Read
+    * @param value The value
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int readBytes(final byte[] value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readBytes(" + value + ")");
+      }
+
+      return ((StreamMessage)_message).readBytes(value);
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public char readChar() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readChar()");
+      }
+
+      return ((StreamMessage)_message).readChar();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public double readDouble() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readDouble()");
+      }
+
+      return ((StreamMessage)_message).readDouble();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public float readFloat() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readFloat()");
+      }
+
+      return ((StreamMessage)_message).readFloat();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int readInt() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readInt()");
+      }
+
+      return ((StreamMessage)_message).readInt();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long readLong() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readLong()");
+      }
+
+      return ((StreamMessage)_message).readLong();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Object readObject() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readObject()");
+      }
+
+      return ((StreamMessage)_message).readObject();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public short readShort() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readShort()");
+      }
+
+      return ((StreamMessage)_message).readShort();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String readString() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("readString()");
+      }
+
+      return ((StreamMessage)_message).readString();
+   }
+
+   /**
+    * Reset
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void reset() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("reset()");
+      }
+
+      ((StreamMessage)_message).reset();
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeBoolean(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeBoolean(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeByte(final byte value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeByte(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeByte(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @param offset The offset
+    * @param length The length
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeBytes(" + value + ", " + offset + ", " + length + ")");
+      }
+
+      ((StreamMessage)_message).writeBytes(value, offset, length);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeBytes(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeBytes(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeChar(final char value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeChar(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeChar(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeDouble(final double value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeDouble(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeDouble(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeFloat(final float value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeFloat(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeFloat(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeInt(final int value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeInt(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeInt(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeLong(final long value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeLong(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeLong(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeObject(final Object value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeObject(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeObject(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeShort(final short value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeShort(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeShort(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeString(final String value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("writeString(" + value + ")");
+      }
+
+      ((StreamMessage)_message).writeString(value);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRATextMessage extends QpidRAMessage implements TextMessage
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRATextMessage.class);
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public QpidRATextMessage(final TextMessage message, final QpidRASessionImpl session)
+   {
+      super(message, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get text
+    * @return The text
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getText() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getText()");
+      }
+
+      return ((TextMessage)_message).getText();
+   }
+
+   /**
+    * Set text
+    * @param string The text
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setText(final String string) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setText(" + string + ")");
+      }
+
+      ((TextMessage)_message).setText(string);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRATopicPublisher.
+ *
+ */
+public class QpidRATopicPublisher extends QpidRAMessageProducer implements TopicPublisher
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRATopicPublisher.class);
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public QpidRATopicPublisher(final TopicPublisher producer, final QpidRASessionImpl session)
+   {
+      super(producer, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(producer) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get the topic
+    * @return The topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Topic getTopic() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTopic()");
+      }
+
+      return ((TopicPublisher)_producer).getTopic();
+   }
+
+   /**
+    * Publish message
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this +
+                                              " message=" +
+                                              Util.asString(message) +
+                                              " deliveryMode=" +
+                                              deliveryMode +
+                                              " priority=" +
+                                              priority +
+                                              " ttl=" +
+                                              timeToLive);
+         }
+
+         checkState();
+
+         ((TopicPublisher)_producer).publish(message, deliveryMode, priority, timeToLive);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Message message) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this + " message=" + Util.asString(message));
+         }
+
+         checkState();
+
+         ((TopicPublisher)_producer).publish(message);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Topic destination,
+                       final Message message,
+                       final int deliveryMode,
+                       final int priority,
+                       final long timeToLive) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this +
+                                              " destination=" +
+                                              destination +
+                                              " message=" +
+                                              Util.asString(message) +
+                                              " deliveryMode=" +
+                                              deliveryMode +
+                                              " priority=" +
+                                              priority +
+                                              " ttl=" +
+                                              timeToLive);
+         }
+
+         checkState();
+
+         ((TopicPublisher)_producer).publish(destination, message, deliveryMode, priority, timeToLive);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Topic destination, final Message message) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+         }
+
+         checkState();
+
+         ((TopicPublisher)_producer).publish(destination, message);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("sent " + this + " result=" + Util.asString(message));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a topic subscriber
+ *
+ */
+public class QpidRATopicSubscriber extends QpidRAMessageConsumer implements TopicSubscriber
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRATopicSubscriber.class);
+
+   /**
+    * Create a new wrapper
+    * @param consumer the topic subscriber
+    * @param session the session
+    */
+   public QpidRATopicSubscriber(final TopicSubscriber consumer, final QpidRASessionImpl session)
+   {
+      super(consumer, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(consumer) + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get the no local value
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getNoLocal() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getNoLocal()");
+      }
+
+      checkState();
+      return ((TopicSubscriber)_consumer).getNoLocal();
+   }
+
+   /**
+    * Get the topic
+    * @return The topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Topic getTopic() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTopic()");
+      }
+
+      checkState();
+      return ((TopicSubscriber)_consumer).getTopic();
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAXAResource.
+ *
+ */
+public class QpidRAXAResource implements XAResource
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAXAResource.class);
+
+   /** The managed connection */
+   private final QpidRAManagedConnection _managedConnection;
+
+   /** The resource */
+   private final XAResource _xaResource;
+
+   /**
+    * Create a new QpidRAXAResource.
+    * @param managedConnection the managed connection
+    * @param xaResource the xa resource
+    */
+   public QpidRAXAResource(final QpidRAManagedConnection managedConnection, final XAResource xaResource)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + managedConnection + ", " + Util.asString(xaResource) + ")");
+      }
+
+      this._managedConnection = managedConnection;
+      this._xaResource = xaResource;
+   }
+
+   /**
+    * Start
+    * @param xid A global transaction identifier
+    * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
+    * @exception XAException An error has occurred
+    */
+   public void start(final Xid xid, final int flags) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start(" + xid + ", " + flags + ")");
+      }
+
+      _managedConnection.lock();
+      try
+      {
+         _xaResource.start(xid, flags);
+      }
+      finally
+      {
+         _managedConnection.setInManagedTx(true);
+         _managedConnection.unlock();
+      }
+   }
+
+   /**
+    * End
+    * @param xid A global transaction identifier
+    * @param flags One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+    * @exception XAException An error has occurred
+    */
+   public void end(final Xid xid, final int flags) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("end(" + xid + ", " + flags + ")");
+      }
+
+      _managedConnection.lock();
+      try
+      {
+         _xaResource.end(xid, flags);
+      }
+      finally
+      {
+         _managedConnection.setInManagedTx(false);
+         _managedConnection.unlock();
+      }
+   }
+
+   /**
+    * Prepare
+    * @param xid A global transaction identifier
+    * @return XA_RDONLY or XA_OK
+    * @exception XAException An error has occurred
+    */
+   public int prepare(final Xid xid) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("prepare(" + xid + ")");
+      }
+
+      return _xaResource.prepare(xid);
+   }
+
+   /**
+    * Commit
+    * @param xid A global transaction identifier
+    * @param onePhase If true, the resource manager should use a one-phase commit protocol to commit the work done on behalf of xid.
+    * @exception XAException An error has occurred
+    */
+   public void commit(final Xid xid, final boolean onePhase) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("commit(" + xid + ", " + onePhase + ")");
+      }
+
+      _xaResource.commit(xid, onePhase);
+   }
+
+   /**
+    * Rollback
+    * @param xid A global transaction identifier
+    * @exception XAException An error has occurred
+    */
+   public void rollback(final Xid xid) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("rollback(" + xid + ")");
+      }
+
+      _xaResource.rollback(xid);
+   }
+
+   /**
+    * Forget
+    * @param xid A global transaction identifier
+    * @exception XAException An error has occurred
+    */
+   public void forget(final Xid xid) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("forget(" + xid + ")");
+      }
+
+      _managedConnection.lock();
+      try
+      {
+         _xaResource.forget(xid);
+      }
+      finally
+      {
+         _managedConnection.setInManagedTx(false);
+         _managedConnection.unlock();
+      }
+   }
+
+   /**
+    * IsSameRM
+    * @param xaRes An XAResource object whose resource manager instance is to be compared with the resource manager instance of the target object.
+    * @return True if its the same RM instance; otherwise false.
+    * @exception XAException An error has occurred
+    */
+   public boolean isSameRM(final XAResource xaRes) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("isSameRM(" + xaRes + ")");
+      }
+
+      return _xaResource.isSameRM(xaRes);
+   }
+
+   /**
+    * Recover
+    * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS
+    * @return Zero or more XIDs
+    * @exception XAException An error has occurred
+    */
+   public Xid[] recover(final int flag) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("recover(" + flag + ")");
+      }
+
+      return _xaResource.recover(flag);
+   }
+
+   /**
+    * Get the transaction timeout in seconds
+    * @return The transaction timeout
+    * @exception XAException An error has occurred
+    */
+   public int getTransactionTimeout() throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionTimeout()");
+      }
+
+      return _xaResource.getTransactionTimeout();
+   }
+
+   /**
+    * Set the transaction timeout
+    * @param seconds The number of seconds
+    * @return True if the transaction timeout value is set successfully; otherwise false.
+    * @exception XAException An error has occurred
+    */
+   public boolean setTransactionTimeout(final int seconds) throws XAException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionTimeout(" + seconds + ")");
+      }
+
+      return _xaResource.setTransactionTimeout(seconds);
+   }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org