You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC
svn commit: r1220336 [6/8] - in /qpid/trunk/qpid/java: ./
client/src/main/java/org/apache/qpid/client/ jca/ jca/example/
jca/example/conf/ jca/example/src/ jca/example/src/main/
jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRASessionImpl.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,1732 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ManagedConnection;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A joint interface for JMS sessions
+ *
+ */
+public class QpidRASessionImpl implements Session, QueueSession, TopicSession, XASession, XAQueueSession, XATopicSession, QpidRASession
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRASessionImpl.class);
+
+ /** The managed connection */
+ private volatile QpidRAManagedConnection _mc;
+ /** The locked managed connection */
+ private QpidRAManagedConnection _lockedMC;
+
+ /** The connection request info */
+ private final QpidRAConnectionRequestInfo _cri;
+
+ /** The session factory */
+ private QpidRASessionFactory _sf;
+
+ /** The message consumers */
+ private final Set<MessageConsumer> _consumers;
+
+ /** The message producers */
+ private final Set<MessageProducer> _producers;
+
+ /** The queue browsers */
+ private final Set<QueueBrowser> _browsers;
+
+ /** Are we started */
+ private AtomicBoolean _started = new AtomicBoolean(false) ;
+
+ /**
+ * Constructor
+ * @param mc The managed connection
+ * @param cri The connection request info
+ */
+ public QpidRASessionImpl(final QpidRAManagedConnection mc, final QpidRAConnectionRequestInfo cri)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + mc + ", " + cri + ")");
+ }
+
+ this._mc = mc;
+ this._cri = cri;
+ _sf = null;
+ _consumers = new HashSet<MessageConsumer>();
+ _producers = new HashSet<MessageProducer>();
+ _browsers = new HashSet<QueueBrowser>();
+ }
+
+ /**
+ * Set the session factory
+ * @param sf The session factory
+ */
+ public void setQpidSessionFactory(final QpidRASessionFactory sf)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setQpidSessionFactory(" + sf + ")");
+ }
+
+ _started.set(false) ;
+ this._sf = sf;
+ }
+
+ /**
+ * Lock
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ protected void lock() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("lock()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.tryLock();
+ _lockedMC = mc ;
+ }
+ else
+ {
+ throw new IllegalStateException("Connection is not associated with a managed connection. " + this);
+ }
+ }
+
+ /**
+ * Unlock
+ */
+ protected void unlock()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("unlock()");
+ }
+
+ if (_lockedMC != null)
+ {
+ try
+ {
+ _lockedMC.unlock();
+ }
+ finally
+ {
+ _lockedMC = null ;
+ }
+ }
+
+ // We recreate the lock when returned to the pool
+ // so missing the unlock after disassociation is not important
+ }
+
+ /**
+ * Create a bytes message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public BytesMessage createBytesMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBytesMessage" + Util.asString(session));
+ }
+
+ return session.createBytesMessage();
+ }
+
+ /**
+ * Create a map message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MapMessage createMapMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createMapMessage" + Util.asString(session));
+ }
+
+ return session.createMapMessage();
+ }
+
+ /**
+ * Create a message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Message createMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createMessage" + Util.asString(session));
+ }
+
+ return session.createMessage();
+ }
+
+ /**
+ * Create an object message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createObjectMessage" + Util.asString(session));
+ }
+
+ return session.createObjectMessage();
+ }
+
+ /**
+ * Create an object message
+ * @param object The object
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createObjectMessage(" + object + ")" + Util.asString(session));
+ }
+
+ return session.createObjectMessage(object);
+ }
+
+ /**
+ * Create a stream message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createStreamMessage" + Util.asString(session));
+ }
+
+ return session.createStreamMessage();
+ }
+
+ /**
+ * Create a text message
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TextMessage createTextMessage() throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTextMessage" + Util.asString(session));
+ }
+
+ return session.createTextMessage();
+ }
+
+ /**
+ * Create a text message
+ * @param string The text
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TextMessage createTextMessage(final String string) throws JMSException
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTextMessage(" + string + ")" + Util.asString(session));
+ }
+
+ return session.createTextMessage(string);
+ }
+
+ /**
+ * Get transacted
+ * @return True if transacted; otherwise false
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getTransacted() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransacted()");
+ }
+
+ getSessionInternal();
+ return _cri.isTransacted();
+ }
+
+ /**
+ * Get the message listener -- throws IllegalStateException
+ * @return The message listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageListener()");
+ }
+
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Set the message listener -- Throws IllegalStateException
+ * @param listener The message listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setMessageListener(final MessageListener listener) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setMessageListener(" + listener + ")");
+ }
+
+ throw new IllegalStateException("Method not allowed");
+ }
+
+ /**
+ * Always throws an Error.
+ * @exception Error Method not allowed.
+ */
+ public void run()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("run()");
+ }
+
+ throw new Error("Method not allowed");
+ }
+
+ /**
+ * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
+ * managed connection.
+ * @exception JMSException Failed to close session.
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close()");
+ }
+
+ _sf.closeSession(this);
+ closeSession();
+ }
+
+ /**
+ * Commit
+ * @exception JMSException Failed to close session.
+ */
+ public void commit() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new TransactionInProgressException("XA connection");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted() == false)
+ {
+ throw new IllegalStateException("Session is not transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Commit session " + this);
+ }
+
+ session.commit();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Rollback
+ * @exception JMSException Failed to close session.
+ */
+ public void rollback() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new TransactionInProgressException("XA connection");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted() == false)
+ {
+ throw new IllegalStateException("Session is not transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Rollback session " + this);
+ }
+
+ session.rollback();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Recover
+ * @exception JMSException Failed to close session.
+ */
+ public void recover() throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_cri.isTransacted())
+ {
+ throw new IllegalStateException("Session is transacted");
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Recover session " + this);
+ }
+
+ session.recover();
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic
+ * @param topicName The topic name
+ * @return The topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Topic createTopic(final String topicName) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTopic " + Util.asString(session) + " topicName=" + topicName);
+ }
+
+ Topic result = session.createTopic(topicName);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTopic " + Util.asString(session) + " topic=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSubscriber " + Util.asString(session) + " topic=" + topic);
+ }
+
+ TopicSubscriber result = session.createSubscriber(topic);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSubscriber " + Util.asString(session) +
+ " topic=" +
+ topic +
+ " selector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a durable topic subscriber
+ * @param topic The topic
+ * @param name The name
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createDurableSubscriber " + Util.asString(session) + " topic=" + topic + " name=" + name);
+ }
+
+ TopicSubscriber result = session.createDurableSubscriber(topic, name);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic subscriber
+ * @param topic The topic
+ * @param name The name
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The subscriber
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSubscriber createDurableSubscriber(final Topic topic,
+ final String name,
+ final String messageSelector,
+ final boolean noLocal) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createDurableSubscriber " + Util.asString(session) +
+ " topic=" +
+ topic +
+ " name=" +
+ name +
+ " selector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
+ result = new QpidRATopicSubscriber(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a topic publisher
+ * @param topic The topic
+ * @return The publisher
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicPublisher createPublisher(final Topic topic) throws JMSException
+ {
+ lock();
+ try
+ {
+ TopicSession session = getTopicSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createPublisher " + Util.asString(session) + " topic=" + topic);
+ }
+
+ TopicPublisher result = session.createPublisher(topic);
+ result = new QpidRATopicPublisher(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdPublisher " + Util.asString(session) + " publisher=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a temporary topic
+ * @return The temporary topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTemporaryTopic " + Util.asString(session));
+ }
+
+ TemporaryTopic temp = session.createTemporaryTopic();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTemporaryTopic " + Util.asString(session) + " temp=" + temp);
+ }
+
+ _sf.addTemporaryTopic(temp);
+
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Unsubscribe
+ * @param name The name
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void unsubscribe(final String name) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("unsubscribe " + Util.asString(session) + " name=" + name);
+ }
+
+ session.unsubscribe(name);
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a browser
+ * @param queue The queue
+ * @return The browser
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueBrowser createBrowser(final Queue queue) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueBrowser result = session.createBrowser(queue);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a browser
+ * @param queue The queue
+ * @param messageSelector The message selector
+ * @return The browser
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createBrowser " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+ }
+
+ QueueBrowser result = session.createBrowser(queue, messageSelector);
+ result = new QpidRAQueueBrowser(result, this);
+ addQueueBrowser(result) ;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a queue
+ * @param queueName The queue name
+ * @return The queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Queue createQueue(final String queueName) throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession");
+ }
+
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createQueue " + Util.asString(session) + " queueName=" + queueName);
+ }
+
+ Queue result = session.createQueue(queueName);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdQueue " + Util.asString(session) + " queue=" + result);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a queue receiver
+ * @param queue The queue
+ * @return The queue receiver
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueReceiver createReceiver(final Queue queue) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueReceiver result = session.createReceiver(queue);
+ result = new QpidRAQueueReceiver(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a queue receiver
+ * @param queue The queue
+ * @param messageSelector
+ * @return The queue receiver
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createReceiver " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
+ }
+
+ QueueReceiver result = session.createReceiver(queue, messageSelector);
+ result = new QpidRAQueueReceiver(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a queue sender
+ * @param queue The queue
+ * @return The queue sender
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueSender createSender(final Queue queue) throws JMSException
+ {
+ lock();
+ try
+ {
+ QueueSession session = getQueueSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createSender " + Util.asString(session) + " queue=" + queue);
+ }
+
+ QueueSender result = session.createSender(queue);
+ result = new QpidRAQueueSender(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdSender " + Util.asString(session) + " sender=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a temporary queue
+ * @return The temporary queue
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
+ }
+
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createTemporaryQueue " + Util.asString(session));
+ }
+
+ TemporaryQueue temp = session.createTemporaryQueue();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdTemporaryQueue " + Util.asString(session) + " temp=" + temp);
+ }
+
+ _sf.addTemporaryQueue(temp);
+
+ return temp;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) + " dest=" + destination);
+ }
+
+ MessageConsumer result = session.createConsumer(destination);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @param messageSelector The message selector
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) +
+ " dest=" +
+ destination +
+ " messageSelector=" +
+ messageSelector);
+ }
+
+ MessageConsumer result = session.createConsumer(destination, messageSelector);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message consumer
+ * @param destination The destination
+ * @param messageSelector The message selector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection
+ * @return The message consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageConsumer createConsumer(final Destination destination,
+ final String messageSelector,
+ final boolean noLocal) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConsumer " + Util.asString(session) +
+ " dest=" +
+ destination +
+ " messageSelector=" +
+ messageSelector +
+ " noLocal=" +
+ noLocal);
+ }
+
+ MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
+ result = new QpidRAMessageConsumer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
+ }
+
+ addConsumer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Create a message producer
+ * @param destination The destination
+ * @return The message producer
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageProducer createProducer(final Destination destination) throws JMSException
+ {
+ lock();
+ try
+ {
+ Session session = getSessionInternal();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createProducer " + Util.asString(session) + " dest=" + destination);
+ }
+
+ MessageProducer result = session.createProducer(destination);
+ result = new QpidRAMessageProducer(result, this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createdProducer " + Util.asString(session) + " producer=" + result);
+ }
+
+ addProducer(result);
+
+ return result;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the acknowledge mode
+ * @return The mode
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getAcknowledgeMode() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getAcknowledgeMode()");
+ }
+
+ getSessionInternal();
+ return _cri.getAcknowledgeMode();
+ }
+
+ /**
+ * Get the XA resource
+ * @return The XA resource
+ * @exception IllegalStateException If non XA connection
+ */
+ public XAResource getXAResource()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResource()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ return null;
+ }
+
+ try
+ {
+ lock();
+
+ return getXAResourceInternal();
+ }
+ catch (Throwable t)
+ {
+ return null;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the session
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Session getSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the queue session
+ * @return The queue session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public QueueSession getQueueSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getQueueSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Get the topic session
+ * @return The topic session
+ * @exception JMSException Thrown if an error occurs
+ */
+ public TopicSession getTopicSession() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTopicSession()");
+ }
+
+ if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
+ _cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ throw new IllegalStateException("Non XA connection");
+ }
+
+ lock();
+ try
+ {
+ return this;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ /**
+ * Set the managed connection
+ * @param managedConnection The managed connection
+ */
+ void setManagedConnection(final QpidRAManagedConnection managedConnection)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setManagedConnection(" + managedConnection + ")");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.removeHandle(this);
+ }
+
+ this._mc = managedConnection;
+ }
+
+ /** for tests only */
+ public ManagedConnection getManagedConnection()
+ {
+ return _mc;
+ }
+
+ /**
+ * Destroy
+ */
+ void destroy()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("destroy()");
+ }
+
+ _mc = null;
+ }
+
+ /**
+ * Start
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void start() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ _started.set(true) ;
+ mc.start();
+ }
+ }
+
+ /**
+ * Stop
+ * @exception JMSException Thrown if an error occurs
+ */
+ void stop() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.stop();
+ _started.set(false) ;
+ }
+ }
+
+ /**
+ * Check strict
+ * @exception JMSException Thrown if an error occurs
+ */
+ void checkStrict() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("checkStrict()");
+ }
+
+ if (_mc != null)
+ {
+ throw new IllegalStateException(QpidRASessionFactory.ISE);
+ }
+ }
+
+ /**
+ * Close session
+ * @exception JMSException Thrown if an error occurs
+ */
+ void closeSession() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ _log.trace("Closing session");
+
+ try
+ {
+ mc.stop();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error stopping managed connection", t);
+ }
+
+ synchronized (_consumers)
+ {
+ for (Iterator<MessageConsumer> i = _consumers.iterator(); i.hasNext();)
+ {
+ QpidRAMessageConsumer consumer = (QpidRAMessageConsumer)i.next();
+ try
+ {
+ consumer.closeConsumer();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing consumer", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_producers)
+ {
+ for (Iterator<MessageProducer> i = _producers.iterator(); i.hasNext();)
+ {
+ QpidRAMessageProducer producer = (QpidRAMessageProducer)i.next();
+ try
+ {
+ producer.closeProducer();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing producer", t);
+ }
+ i.remove();
+ }
+ }
+
+ synchronized (_browsers)
+ {
+ for (Iterator<QueueBrowser> i = _browsers.iterator(); i.hasNext();)
+ {
+ QpidRAQueueBrowser browser = (QpidRAQueueBrowser)i.next();
+ try
+ {
+ browser.close();
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Error closing browser", t);
+ }
+ i.remove();
+ }
+ }
+
+ mc.removeHandle(this);
+ ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED);
+ ev.setConnectionHandle(this);
+ mc.sendEvent(ev);
+ this._mc = null;
+ }
+ }
+
+ /**
+ * Add consumer
+ * @param consumer The consumer
+ */
+ void addConsumer(final MessageConsumer consumer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addConsumer(" + consumer + ")");
+ }
+
+ synchronized (_consumers)
+ {
+ _consumers.add(consumer);
+ }
+ }
+
+ /**
+ * Remove consumer
+ * @param consumer The consumer
+ */
+ void removeConsumer(final MessageConsumer consumer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeConsumer(" + consumer + ")");
+ }
+
+ synchronized (_consumers)
+ {
+ _consumers.remove(consumer);
+ }
+ }
+
+ /**
+ * Add producer
+ * @param producer The producer
+ */
+ void addProducer(final MessageProducer producer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addProducer(" + producer + ")");
+ }
+
+ synchronized (_producers)
+ {
+ _producers.add(producer);
+ }
+ }
+
+ /**
+ * Remove producer
+ * @param producer The producer
+ */
+ void removeProducer(final MessageProducer producer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeProducer(" + producer + ")");
+ }
+
+ synchronized (_producers)
+ {
+ _producers.remove(producer);
+ }
+ }
+
+ /**
+ * Add queue browser
+ * @param browser The queue browser
+ */
+ void addQueueBrowser(final QueueBrowser browser)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addQueueBrowser(" + browser + ")");
+ }
+
+ synchronized (_browsers)
+ {
+ _browsers.add(browser);
+ }
+ }
+
+ /**
+ * Remove queue browser
+ * @param browser The queue browser
+ */
+ void removeQueueBrowser(final QueueBrowser browser)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeQueueBrowser(" + browser + ")");
+ }
+
+ synchronized (_browsers)
+ {
+ _browsers.remove(browser);
+ }
+ }
+
+ /**
+ * Get the session and ensure that it is open
+ * @return The session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ Session getSessionInternal() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc == null)
+ {
+ throw new IllegalStateException("The session is closed");
+ }
+
+ Session session = mc.getSession();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSessionInternal " + Util.asString(session) + " for " + this);
+ }
+
+ return session;
+ }
+
+ /**
+ * Get the XA resource and ensure that it is open
+ * @return The XA Resource
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ XAResource getXAResourceInternal() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc == null)
+ {
+ throw new IllegalStateException("The session is closed");
+ }
+
+ try
+ {
+ XAResource xares = mc.getXAResource();
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResourceInternal " + xares + " for " + this);
+ }
+
+ return xares;
+ }
+ catch (ResourceException e)
+ {
+ JMSException jmse = new JMSException("Unable to get XA Resource");
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+
+ /**
+ * Get the queue session
+ * @return The queue session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ QueueSession getQueueSessionInternal() throws JMSException
+ {
+ Session s = getSessionInternal();
+ if (!(s instanceof QueueSession))
+ {
+ throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this);
+ }
+ return (QueueSession)s;
+ }
+
+ /**
+ * Get the topic session
+ * @return The topic session
+ * @exception JMSException Thrown if an error occurs
+ * @exception IllegalStateException The session is closed
+ */
+ TopicSession getTopicSessionInternal() throws JMSException
+ {
+ Session s = getSessionInternal();
+ if (!(s instanceof TopicSession))
+ {
+ throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this);
+ }
+ return (TopicSession)s;
+ }
+
+ /**
+ * @throws SystemException
+ * @throws RollbackException
+ *
+ */
+ public void checkState() throws JMSException
+ {
+ final QpidRAManagedConnection mc = this._mc;
+ if (mc != null)
+ {
+ mc.checkTransactionActive();
+ }
+ }
+
+ /**
+ * Has this session been started?
+ * @return true if started, false if stopped.
+ */
+ public boolean isStarted()
+ {
+ return _started.get();
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAStreamMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,415 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAStreamMessage extends QpidRAMessage implements StreamMessage
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAStreamMessage.class);
+
+ /**
+ * Create a new wrapper
+ * @param message the message
+ * @param session the session
+ */
+ public QpidRAStreamMessage(final StreamMessage message, final QpidRASessionImpl session)
+ {
+ super(message, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean readBoolean() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readBoolean()");
+ }
+
+ return ((StreamMessage)_message).readBoolean();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public byte readByte() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readByte()");
+ }
+
+ return ((StreamMessage)_message).readByte();
+ }
+
+ /**
+ * Read
+ * @param value The value
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int readBytes(final byte[] value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readBytes(" + value + ")");
+ }
+
+ return ((StreamMessage)_message).readBytes(value);
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public char readChar() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readChar()");
+ }
+
+ return ((StreamMessage)_message).readChar();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public double readDouble() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readDouble()");
+ }
+
+ return ((StreamMessage)_message).readDouble();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public float readFloat() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readFloat()");
+ }
+
+ return ((StreamMessage)_message).readFloat();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int readInt() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readInt()");
+ }
+
+ return ((StreamMessage)_message).readInt();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long readLong() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readLong()");
+ }
+
+ return ((StreamMessage)_message).readLong();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Object readObject() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readObject()");
+ }
+
+ return ((StreamMessage)_message).readObject();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public short readShort() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readShort()");
+ }
+
+ return ((StreamMessage)_message).readShort();
+ }
+
+ /**
+ * Read
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String readString() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("readString()");
+ }
+
+ return ((StreamMessage)_message).readString();
+ }
+
+ /**
+ * Reset
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void reset() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("reset()");
+ }
+
+ ((StreamMessage)_message).reset();
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeBoolean(final boolean value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeBoolean(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeBoolean(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeByte(final byte value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeByte(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeByte(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @param offset The offset
+ * @param length The length
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeBytes(" + value + ", " + offset + ", " + length + ")");
+ }
+
+ ((StreamMessage)_message).writeBytes(value, offset, length);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeBytes(final byte[] value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeBytes(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeBytes(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeChar(final char value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeChar(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeChar(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeDouble(final double value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeDouble(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeDouble(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeFloat(final float value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeFloat(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeFloat(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeInt(final int value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeInt(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeInt(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeLong(final long value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeLong(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeLong(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeObject(final Object value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeObject(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeObject(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeShort(final short value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeShort(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeShort(value);
+ }
+
+ /**
+ * Write
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void writeString(final String value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("writeString(" + value + ")");
+ }
+
+ ((StreamMessage)_message).writeString(value);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATextMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRATextMessage extends QpidRAMessage implements TextMessage
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRATextMessage.class);
+
+ /**
+ * Create a new wrapper
+ * @param message the message
+ * @param session the session
+ */
+ public QpidRATextMessage(final TextMessage message, final QpidRASessionImpl session)
+ {
+ super(message, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get text
+ * @return The text
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getText() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getText()");
+ }
+
+ return ((TextMessage)_message).getText();
+ }
+
+ /**
+ * Set text
+ * @param string The text
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setText(final String string) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setText(" + string + ")");
+ }
+
+ ((TextMessage)_message).setText(string);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicPublisher.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRATopicPublisher.
+ *
+ */
+public class QpidRATopicPublisher extends QpidRAMessageProducer implements TopicPublisher
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRATopicPublisher.class);
+
+ /**
+ * Create a new wrapper
+ * @param producer the producer
+ * @param session the session
+ */
+ public QpidRATopicPublisher(final TopicPublisher producer, final QpidRASessionImpl session)
+ {
+ super(producer, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(producer) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get the topic
+ * @return The topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Topic getTopic() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTopic()");
+ }
+
+ return ((TopicPublisher)_producer).getTopic();
+ }
+
+ /**
+ * Publish message
+ * @param message The message
+ * @param deliveryMode The delivery mode
+ * @param priority The priority
+ * @param timeToLive The time to live
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this +
+ " message=" +
+ Util.asString(message) +
+ " deliveryMode=" +
+ deliveryMode +
+ " priority=" +
+ priority +
+ " ttl=" +
+ timeToLive);
+ }
+
+ checkState();
+
+ ((TopicPublisher)_producer).publish(message, deliveryMode, priority, timeToLive);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Publish message
+ * @param message The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void publish(final Message message) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this + " message=" + Util.asString(message));
+ }
+
+ checkState();
+
+ ((TopicPublisher)_producer).publish(message);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Publish message
+ * @param destination The destination
+ * @param message The message
+ * @param deliveryMode The delivery mode
+ * @param priority The priority
+ * @param timeToLive The time to live
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void publish(final Topic destination,
+ final Message message,
+ final int deliveryMode,
+ final int priority,
+ final long timeToLive) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this +
+ " destination=" +
+ destination +
+ " message=" +
+ Util.asString(message) +
+ " deliveryMode=" +
+ deliveryMode +
+ " priority=" +
+ priority +
+ " ttl=" +
+ timeToLive);
+ }
+
+ checkState();
+
+ ((TopicPublisher)_producer).publish(destination, message, deliveryMode, priority, timeToLive);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Publish message
+ * @param destination The destination
+ * @param message The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void publish(final Topic destination, final Message message) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("send " + this + " destination=" + destination + " message=" + Util.asString(message));
+ }
+
+ checkState();
+
+ ((TopicPublisher)_producer).publish(destination, message);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sent " + this + " result=" + Util.asString(message));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRATopicSubscriber.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a topic subscriber
+ *
+ */
+public class QpidRATopicSubscriber extends QpidRAMessageConsumer implements TopicSubscriber
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRATopicSubscriber.class);
+
+ /**
+ * Create a new wrapper
+ * @param consumer the topic subscriber
+ * @param session the session
+ */
+ public QpidRATopicSubscriber(final TopicSubscriber consumer, final QpidRASessionImpl session)
+ {
+ super(consumer, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(consumer) + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get the no local value
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getNoLocal() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getNoLocal()");
+ }
+
+ checkState();
+ return ((TopicSubscriber)_consumer).getNoLocal();
+ }
+
+ /**
+ * Get the topic
+ * @return The topic
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Topic getTopic() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTopic()");
+ }
+
+ checkState();
+ return ((TopicSubscriber)_consumer).getTopic();
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAXAResource.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * QpidRAXAResource.
+ *
+ */
+public class QpidRAXAResource implements XAResource
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAXAResource.class);
+
+ /** The managed connection */
+ private final QpidRAManagedConnection _managedConnection;
+
+ /** The resource */
+ private final XAResource _xaResource;
+
+ /**
+ * Create a new QpidRAXAResource.
+ * @param managedConnection the managed connection
+ * @param xaResource the xa resource
+ */
+ public QpidRAXAResource(final QpidRAManagedConnection managedConnection, final XAResource xaResource)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + managedConnection + ", " + Util.asString(xaResource) + ")");
+ }
+
+ this._managedConnection = managedConnection;
+ this._xaResource = xaResource;
+ }
+
+ /**
+ * Start
+ * @param xid A global transaction identifier
+ * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @exception XAException An error has occurred
+ */
+ public void start(final Xid xid, final int flags) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start(" + xid + ", " + flags + ")");
+ }
+
+ _managedConnection.lock();
+ try
+ {
+ _xaResource.start(xid, flags);
+ }
+ finally
+ {
+ _managedConnection.setInManagedTx(true);
+ _managedConnection.unlock();
+ }
+ }
+
+ /**
+ * End
+ * @param xid A global transaction identifier
+ * @param flags One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @exception XAException An error has occurred
+ */
+ public void end(final Xid xid, final int flags) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("end(" + xid + ", " + flags + ")");
+ }
+
+ _managedConnection.lock();
+ try
+ {
+ _xaResource.end(xid, flags);
+ }
+ finally
+ {
+ _managedConnection.setInManagedTx(false);
+ _managedConnection.unlock();
+ }
+ }
+
+ /**
+ * Prepare
+ * @param xid A global transaction identifier
+ * @return XA_RDONLY or XA_OK
+ * @exception XAException An error has occurred
+ */
+ public int prepare(final Xid xid) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("prepare(" + xid + ")");
+ }
+
+ return _xaResource.prepare(xid);
+ }
+
+ /**
+ * Commit
+ * @param xid A global transaction identifier
+ * @param onePhase If true, the resource manager should use a one-phase commit protocol to commit the work done on behalf of xid.
+ * @exception XAException An error has occurred
+ */
+ public void commit(final Xid xid, final boolean onePhase) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("commit(" + xid + ", " + onePhase + ")");
+ }
+
+ _xaResource.commit(xid, onePhase);
+ }
+
+ /**
+ * Rollback
+ * @param xid A global transaction identifier
+ * @exception XAException An error has occurred
+ */
+ public void rollback(final Xid xid) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("rollback(" + xid + ")");
+ }
+
+ _xaResource.rollback(xid);
+ }
+
+ /**
+ * Forget
+ * @param xid A global transaction identifier
+ * @exception XAException An error has occurred
+ */
+ public void forget(final Xid xid) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("forget(" + xid + ")");
+ }
+
+ _managedConnection.lock();
+ try
+ {
+ _xaResource.forget(xid);
+ }
+ finally
+ {
+ _managedConnection.setInManagedTx(false);
+ _managedConnection.unlock();
+ }
+ }
+
+ /**
+ * IsSameRM
+ * @param xaRes An XAResource object whose resource manager instance is to be compared with the resource manager instance of the target object.
+ * @return True if its the same RM instance; otherwise false.
+ * @exception XAException An error has occurred
+ */
+ public boolean isSameRM(final XAResource xaRes) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isSameRM(" + xaRes + ")");
+ }
+
+ return _xaResource.isSameRM(xaRes);
+ }
+
+ /**
+ * Recover
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS
+ * @return Zero or more XIDs
+ * @exception XAException An error has occurred
+ */
+ public Xid[] recover(final int flag) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("recover(" + flag + ")");
+ }
+
+ return _xaResource.recover(flag);
+ }
+
+ /**
+ * Get the transaction timeout in seconds
+ * @return The transaction timeout
+ * @exception XAException An error has occurred
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionTimeout()");
+ }
+
+ return _xaResource.getTransactionTimeout();
+ }
+
+ /**
+ * Set the transaction timeout
+ * @param seconds The number of seconds
+ * @return True if the transaction timeout value is set successfully; otherwise false.
+ * @exception XAException An error has occurred
+ */
+ public boolean setTransactionTimeout(final int seconds) throws XAException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionTimeout(" + seconds + ")");
+ }
+
+ return _xaResource.setTransactionTimeout(seconds);
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org