You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC
svn commit: r1220336 [4/8] - in /qpid/trunk/qpid/java: ./
client/src/main/java/org/apache/qpid/client/ jca/ jca/example/
jca/example/conf/ jca/example/src/ jca/example/src/main/
jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,880 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ConnectionEventListener;
+import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.IllegalStateException;
+import javax.resource.spi.LocalTransaction;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.ManagedConnectionMetaData;
+import javax.resource.spi.SecurityException;
+import javax.security.auth.Subject;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The managed connection
+ *
+ */
+public class QpidRAManagedConnection implements ManagedConnection, ExceptionListener
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAManagedConnection.class);
+
+ /** The managed connection factory */
+ private final QpidRAManagedConnectionFactory _mcf;
+
+ /** The connection request information */
+ private final QpidRAConnectionRequestInfo _cri;
+
+ /** The user name */
+ private final String _userName;
+
+ /** The password */
+ private final String _password;
+
+ /** Has the connection been destroyed */
+ private final AtomicBoolean _isDestroyed = new AtomicBoolean(false);
+
+ /** Event listeners */
+ private final List<ConnectionEventListener> _eventListeners;
+
+ /** Handles */
+ private final Set<QpidRASessionImpl> _handles;
+
+ /** Lock */
+ private ReentrantLock _lock = new ReentrantLock();
+
+ // Physical JMS connection stuff
+ private Connection _connection;
+
+ private XASession _xaSession;
+
+ private XAResource _xaResource;
+
+ private Session _session;
+
+ private final TransactionManager _tm;
+
+ private boolean _inManagedTx;
+
+ /**
+ * Constructor
+ * @param mcf The managed connection factory
+ * @param cri The connection request information
+ * @param userName The user name
+ * @param password The password
+ */
+ public QpidRAManagedConnection(final QpidRAManagedConnectionFactory mcf,
+ final QpidRAConnectionRequestInfo cri,
+ final TransactionManager tm,
+ final String userName,
+ final String password) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + mcf + ", " + cri + ", " + userName + ", ****)");
+ }
+
+ this._mcf = mcf;
+ this._cri = cri;
+ this._tm = tm;
+ this._userName = userName;
+ this._password = password;
+ _eventListeners = Collections.synchronizedList(new ArrayList<ConnectionEventListener>());
+ _handles = Collections.synchronizedSet(new HashSet<QpidRASessionImpl>());
+
+ try
+ {
+ setup();
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ destroy();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ throw new ResourceException("Error during setup", t);
+ }
+ }
+
+ /**
+ * Get a connection
+ * @param subject The security subject
+ * @param cxRequestInfo The request info
+ * @return The connection
+ * @exception ResourceException Thrown if an error occurs
+ */
+ public synchronized Object getConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getConnection(" + subject + ", " + cxRequestInfo + ")");
+ }
+
+ // Check user first
+ QpidRACredential credential = QpidRACredential.getCredential(_mcf, subject, cxRequestInfo);
+
+ // Null users are allowed!
+ if (_userName != null && !_userName.equals(credential.getUserName()))
+ {
+ throw new SecurityException("Password credentials not the same, reauthentication not allowed");
+ }
+
+ if (_userName == null && credential.getUserName() != null)
+ {
+ throw new SecurityException("Password credentials not the same, reauthentication not allowed");
+ }
+
+ if (_isDestroyed.get())
+ {
+ throw new IllegalStateException("The managed connection is already destroyed");
+ }
+
+ QpidRASessionImpl session = new QpidRASessionImpl(this, (QpidRAConnectionRequestInfo)cxRequestInfo);
+ _handles.add(session);
+ return session;
+ }
+
+ /**
+ * Destroy all handles.
+ * @exception ResourceException Failed to close one or more handles.
+ */
+ private void destroyHandles() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("destroyHandles()");
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.trace("Ignored error stopping connection", t);
+ }
+
+ for (QpidRASessionImpl session : _handles)
+ {
+ session.destroy();
+ }
+
+ _handles.clear();
+ }
+
+ /**
+ * Destroy the physical connection.
+ * @exception ResourceException Could not property close the session and connection.
+ */
+ public void destroy() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("destroy()");
+ }
+
+ if (_isDestroyed.get() || _connection == null)
+ {
+ return;
+ }
+
+ _isDestroyed.set(true);
+
+ try
+ {
+ _connection.setExceptionListener(null);
+ }
+ catch (JMSException e)
+ {
+ _log.debug("Error unsetting the exception listener " + this, e);
+ }
+
+ destroyHandles();
+
+ try
+ {
+ try
+ {
+ if (_xaSession != null)
+ {
+ _xaSession.close();
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.debug("Error closing session " + this, e);
+ }
+
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ throw new ResourceException("Could not properly close the session and connection", e);
+ }
+ }
+
+ /**
+ * Cleanup
+ * @exception ResourceException Thrown if an error occurs
+ */
+ public void cleanup() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("cleanup()");
+ }
+
+ if (_isDestroyed.get())
+ {
+ throw new IllegalStateException("ManagedConnection already destroyed");
+ }
+
+ destroyHandles();
+
+ _inManagedTx = false;
+
+ // I'm recreating the lock object when we return to the pool
+ // because it looks too nasty to expect the connection handle
+ // to unlock properly in certain race conditions
+ // where the dissociation of the managed connection is "random".
+ _lock = new ReentrantLock();
+ }
+
+ /**
+ * Move a handler from one mc to this one.
+ * @param obj An object of type QpidRASession.
+ * @throws ResourceException Failed to associate connection.
+ * @throws IllegalStateException ManagedConnection in an illegal state.
+ */
+ public void associateConnection(final Object obj) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("associateConnection(" + obj + ")");
+ }
+
+ if (!_isDestroyed.get() && obj instanceof QpidRASessionImpl)
+ {
+ QpidRASessionImpl h = (QpidRASessionImpl)obj;
+ h.setManagedConnection(this);
+ _handles.add(h);
+ }
+ else
+ {
+ throw new IllegalStateException("ManagedConnection in an illegal state");
+ }
+ }
+
+ public void checkTransactionActive() throws JMSException
+ {
+ // don't bother looking at the transaction if there's an active XID
+ if (!_inManagedTx && _tm != null)
+ {
+ try
+ {
+ Transaction tx = _tm.getTransaction();
+ if (tx != null)
+ {
+ int status = tx.getStatus();
+ // Only allow states that will actually succeed
+ if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING &&
+ status != Status.STATUS_PREPARED &&
+ status != Status.STATUS_COMMITTING)
+ {
+ throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
+ }
+ }
+ }
+ catch (SystemException e)
+ {
+ JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
+ jmsE.initCause(e);
+ throw jmsE;
+ }
+ }
+ }
+
+
+ /**
+ * Aqquire a lock on the managed connection
+ */
+ protected void lock()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("lock()");
+ }
+
+ _lock.lock();
+ }
+
+ /**
+ * Aqquire a lock on the managed connection within the specified period
+ * @exception JMSException Thrown if an error occurs
+ */
+ protected void tryLock() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("tryLock()");
+ }
+
+ Integer tryLock = _mcf.getUseTryLock();
+ if (tryLock == null || tryLock.intValue() <= 0)
+ {
+ lock();
+ return;
+ }
+ try
+ {
+ if (_lock.tryLock(tryLock.intValue(), TimeUnit.SECONDS) == false)
+ {
+ throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+ }
+ }
+
+ /**
+ * Unlock the managed connection
+ */
+ protected void unlock()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("unlock()");
+ }
+
+ if (_lock.isHeldByCurrentThread())
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Add a connection event listener.
+ * @param l The connection event listener to be added.
+ */
+ public void addConnectionEventListener(final ConnectionEventListener l)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("addConnectionEventListener(" + l + ")");
+ }
+
+ _eventListeners.add(l);
+ }
+
+ /**
+ * Remove a connection event listener.
+ * @param l The connection event listener to be removed.
+ */
+ public void removeConnectionEventListener(final ConnectionEventListener l)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeConnectionEventListener(" + l + ")");
+ }
+
+ _eventListeners.remove(l);
+ }
+
+ /**
+ * Get the XAResource for the connection.
+ * @return The XAResource for the connection.
+ * @exception ResourceException XA transaction not supported
+ */
+ public XAResource getXAResource() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResource()");
+ }
+
+ //
+ // Spec says a mc must allways return the same XA resource,
+ // so we cache it.
+ //
+ if (_xaResource == null)
+ {
+ _xaResource = new QpidRAXAResource(this, _xaSession.getXAResource());
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("XAResource=" + _xaResource);
+ }
+
+ return _xaResource;
+ }
+
+ /**
+ * Get the location transaction for the connection.
+ * @return The local transaction for the connection.
+ * @exception ResourceException Thrown if operation fails.
+ */
+ public LocalTransaction getLocalTransaction() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getLocalTransaction()");
+ }
+
+ LocalTransaction tx = new QpidRALocalTransaction(this);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("LocalTransaction=" + tx);
+ }
+
+ return tx;
+ }
+
+ /**
+ * Get the meta data for the connection.
+ * @return The meta data for the connection.
+ * @exception ResourceException Thrown if the operation fails.
+ * @exception IllegalStateException Thrown if the managed connection already is destroyed.
+ */
+ public ManagedConnectionMetaData getMetaData() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMetaData()");
+ }
+
+ if (_isDestroyed.get())
+ {
+ throw new IllegalStateException("The managed connection is already destroyed");
+ }
+
+ return new QpidRAMetaData(this);
+ }
+
+ /**
+ * Set the log writer -- NOT SUPPORTED
+ * @param out The log writer
+ * @exception ResourceException If operation fails
+ */
+ public void setLogWriter(final PrintWriter out) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setLogWriter(" + out + ")");
+ }
+ }
+
+ /**
+ * Get the log writer -- NOT SUPPORTED
+ * @return Always null
+ * @exception ResourceException If operation fails
+ */
+ public PrintWriter getLogWriter() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getLogWriter()");
+ }
+
+ return null;
+ }
+
+ /**
+ * Notifies user of a JMS exception.
+ * @param exception The JMS exception
+ */
+ public void onException(final JMSException exception)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("onException(" + exception + ")");
+ }
+
+ if (_isDestroyed.get())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Ignoring error on already destroyed connection " + this, exception);
+ }
+ return;
+ }
+
+ _log.warn("Handling JMS exception failure: " + this, exception);
+
+ try
+ {
+ _connection.setExceptionListener(null);
+ }
+ catch (JMSException e)
+ {
+ _log.debug("Unable to unset exception listener", e);
+ }
+
+ ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
+ sendEvent(event);
+ }
+
+ /**
+ * Get the session for this connection.
+ * @return The session
+ * @throws JMSException
+ */
+ protected Session getSession() throws JMSException
+ {
+ if(_xaSession != null && !_mcf.getUseLocalTx())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSession() -> XA session " + Util.asString(_xaSession));
+ }
+
+ return _xaSession;
+ }
+ else
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSession() -> session " + Util.asString(_session));
+ }
+
+ return _session;
+ }
+ }
+
+ /**
+ * Send an event.
+ * @param event The event to send.
+ */
+ protected void sendEvent(final ConnectionEvent event)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("sendEvent(" + event + ")");
+ }
+
+ int type = event.getId();
+
+ // convert to an array to avoid concurrent modification exceptions
+ ConnectionEventListener[] list = _eventListeners.toArray(new ConnectionEventListener[_eventListeners.size()]);
+
+ for (ConnectionEventListener l : list)
+ {
+ switch (type)
+ {
+ case ConnectionEvent.CONNECTION_CLOSED:
+ l.connectionClosed(event);
+ break;
+
+ case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
+ l.localTransactionStarted(event);
+ break;
+
+ case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
+ l.localTransactionCommitted(event);
+ break;
+
+ case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
+ l.localTransactionRolledback(event);
+ break;
+
+ case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
+ l.connectionErrorOccurred(event);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Illegal eventType: " + type);
+ }
+ }
+ }
+
+ /**
+ * Remove a handle from the handle map.
+ * @param handle The handle to remove.
+ */
+ protected void removeHandle(final QpidRASessionImpl handle)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("removeHandle(" + handle + ")");
+ }
+
+ _handles.remove(handle);
+ }
+
+ /**
+ * Get the request info for this connection.
+ * @return The connection request info for this connection.
+ */
+ protected QpidRAConnectionRequestInfo getCRI()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getCRI()");
+ }
+
+ return _cri;
+ }
+
+ /**
+ * Get the connection factory for this connection.
+ * @return The connection factory for this connection.
+ */
+ protected QpidRAManagedConnectionFactory getManagedConnectionFactory()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getManagedConnectionFactory()");
+ }
+
+ return _mcf;
+ }
+
+ /**
+ * Start the connection
+ * @exception JMSException Thrown if the connection cant be started
+ */
+ void start() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start()");
+ }
+
+ if (_connection != null)
+ {
+ _connection.start();
+ }
+ }
+
+ /**
+ * Stop the connection
+ * @exception JMSException Thrown if the connection cant be stopped
+ */
+ void stop() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+
+ /**
+ * Get the user name
+ * @return The user name
+ */
+ protected String getUserName()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUserName()");
+ }
+
+ return _userName;
+ }
+
+ /**
+ * Setup the connection.
+ * @exception ResourceException Thrown if a connection couldnt be created
+ */
+ private void setup() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setup()");
+ }
+
+ try
+ {
+ boolean transacted = _cri.isTransacted();
+ int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ boolean localTx = _mcf.getUseLocalTx();
+
+ if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+ {
+ if (_userName != null && _password != null)
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password);
+ }
+ else
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection();
+ }
+ }
+ else
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection();
+ }
+ else
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection();
+ }
+ }
+
+ if(!localTx)
+ {
+ _xaSession = ((XATopicConnection)_connection).createXATopicSession();
+
+ }
+ else
+ {
+ _session = ((TopicConnection)_connection).createTopicSession(localTx, acknowledgeMode);
+ }
+ }
+ else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION)
+ {
+ if (_userName != null && _password != null)
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password);
+ }
+ else
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection();
+ }
+ }
+ else
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection();
+ }
+ else
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection();
+ }
+ }
+
+ if(!localTx)
+ {
+ _xaSession = ((XAQueueConnection)_connection).createXAQueueSession();
+
+ }
+ else
+ {
+ _session = ((QueueConnection)_connection).createQueueSession(localTx, acknowledgeMode);
+
+ }
+ }
+ else
+ {
+ if (_userName != null && _password != null)
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password);
+ }
+ else
+ {
+ _connection = _mcf.getCleanAMQConnectionFactory().createConnection();
+ }
+ }
+ else
+ {
+ if(!localTx)
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection();
+ }
+ else
+ {
+ _connection = _mcf.getDefaultAMQConnectionFactory().createConnection();
+ }
+ }
+
+ if(!localTx)
+ {
+ _xaSession = ((XAQueueConnection)_connection).createXASession();
+
+ }
+ else
+ {
+ _session = ((QueueConnection)_connection).createSession(localTx, acknowledgeMode);
+
+ }
+ }
+
+ _connection.setExceptionListener(this);
+ }
+ catch (JMSException je)
+ {
+ throw new ResourceException(je.getMessage(), je);
+ }
+ }
+
+ protected void setInManagedTx(boolean inManagedTx)
+ {
+ this._inManagedTx = inManagedTx;
+ }
+
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,623 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.PrintWriter;
+import java.util.Set;
+
+import javax.jms.ConnectionMetaData;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.ManagedConnectionFactory;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterAssociation;
+import javax.security.auth.Subject;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Qpid ManagedConectionFactory
+ *
+ */
+public class QpidRAManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation
+{
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = -8798804592247643959L;
+
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAManagedConnectionFactory.class);
+
+ /**
+ * The resource adapter
+ */
+ private QpidResourceAdapter _ra;
+
+ /**
+ * Connection manager
+ */
+ private ConnectionManager _cm;
+
+ /**
+ * The managed connection factory properties
+ */
+ private final QpidRAMCFProperties _mcfProperties;
+
+ /**
+ * Connection Factory used if properties are set
+ */
+ private AMQConnectionFactory _connectionFactory;
+
+ /**
+ * Constructor
+ */
+ public QpidRAManagedConnectionFactory()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor()");
+ }
+
+ _ra = null;
+ _cm = null;
+ _mcfProperties = new QpidRAMCFProperties();
+ }
+
+ /**
+ * Creates a Connection Factory instance
+ *
+ * @return javax.resource.cci.ConnectionFactory instance
+ * @throws ResourceException Thrown if a connection factory cant be created
+ */
+ public Object createConnectionFactory() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionFactory()");
+ }
+
+ return createConnectionFactory(new QpidRAConnectionManager());
+ }
+
+ /**
+ * Creates a Connection Factory instance
+ *
+ * @param cxManager The connection manager
+ * @return javax.resource.cci.ConnectionFactory instance
+ * @throws ResourceException Thrown if a connection factory cant be created
+ */
+ public Object createConnectionFactory(final ConnectionManager cxManager) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createConnectionFactory(" + cxManager + ")");
+ }
+
+ _cm = cxManager;
+
+ QpidRAConnectionFactory cf = new QpidRAConnectionFactoryImpl(this, _cm);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Created connection factory: " + cf +
+ ", using connection manager: " +
+ _cm);
+ }
+
+ return cf;
+ }
+
+ /**
+ * Creates a new physical connection to the underlying EIS resource manager.
+ *
+ * @param subject Caller's security information
+ * @param cxRequestInfo Additional resource adapter specific connection request information
+ * @return The managed connection
+ * @throws ResourceException Thrown if a managed connection cant be created
+ */
+ public ManagedConnection createManagedConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("createManagedConnection(" + subject + ", " + cxRequestInfo + ")");
+ }
+
+ QpidRAConnectionRequestInfo cri = getCRI((QpidRAConnectionRequestInfo)cxRequestInfo);
+
+ QpidRACredential credential = QpidRACredential.getCredential(this, subject, cri);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("jms credential: " + credential);
+ }
+
+ QpidRAManagedConnection mc = new QpidRAManagedConnection(this,
+ cri,
+ _ra.getTM(),
+ credential.getUserName(),
+ credential.getPassword());
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("created new managed connection: " + mc);
+ }
+
+ return mc;
+ }
+
+ /**
+ * Returns a matched connection from the candidate set of connections.
+ *
+ * @param connectionSet The candidate connection set
+ * @param subject Caller's security information
+ * @param cxRequestInfo Additional resource adapter specific connection request information
+ * @return The managed connection
+ * @throws ResourceException Thrown if no managed connection can be found
+ */
+ @SuppressWarnings("rawtypes")
+ public ManagedConnection matchManagedConnections(final Set connectionSet,
+ final Subject subject,
+ final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("matchManagedConnections(" + connectionSet +
+ ", " +
+ subject +
+ ", " +
+ cxRequestInfo +
+ ")");
+ }
+
+ QpidRAConnectionRequestInfo cri = getCRI((QpidRAConnectionRequestInfo)cxRequestInfo);
+ QpidRACredential credential = QpidRACredential.getCredential(this, subject, cri);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Looking for connection matching credentials: " + credential);
+ }
+
+ for (final Object obj : connectionSet)
+ {
+ if (obj instanceof QpidRAManagedConnection)
+ {
+ QpidRAManagedConnection mc = (QpidRAManagedConnection)obj;
+ ManagedConnectionFactory mcf = mc.getManagedConnectionFactory();
+
+ if ((mc.getUserName() == null || mc.getUserName() != null && mc.getUserName()
+ .equals(credential.getUserName())) && mcf.equals(this))
+ {
+ if (cri.equals(mc.getCRI()))
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Found matching connection: " + mc);
+ }
+
+ return mc;
+ }
+ }
+ }
+ }
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No matching connection was found");
+ }
+
+ return null;
+ }
+
+ /**
+ * Set the log writer -- NOT SUPPORTED
+ *
+ * @param out The writer
+ * @throws ResourceException Thrown if the writer cant be set
+ */
+ public void setLogWriter(final PrintWriter out) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setLogWriter(" + out + ")");
+ }
+ }
+
+ /**
+ * Get the log writer -- NOT SUPPORTED
+ *
+ * @return The writer
+ * @throws ResourceException Thrown if the writer cant be retrieved
+ */
+ public PrintWriter getLogWriter() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getLogWriter()");
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the resource adapter
+ *
+ * @return The resource adapter
+ */
+ public ResourceAdapter getResourceAdapter()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getResourceAdapter()");
+ }
+
+ return _ra;
+ }
+
+ /**
+ * Set the resource adapter
+ *
+ * @param ra The resource adapter
+ * @throws ResourceException Thrown if incorrect resource adapter
+ */
+ public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setResourceAdapter(" + ra + ")");
+ }
+
+ if (!(ra instanceof QpidResourceAdapter))
+ {
+ throw new ResourceException("Resource adapter is " + ra);
+ }
+
+ this._ra = (QpidResourceAdapter)ra;
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * @param obj Object with which to compare
+ * @return True if this object is the same as the obj argument; false otherwise.
+ */
+ @Override
+ public boolean equals(final Object obj)
+ {
+ if (obj instanceof QpidRAManagedConnectionFactory)
+ {
+ QpidRAManagedConnectionFactory other = (QpidRAManagedConnectionFactory)obj;
+
+ return _mcfProperties.equals(other.getProperties()) && _ra.equals(other.getResourceAdapter());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Return the hash code for the object
+ *
+ * @return The hash code
+ */
+ @Override
+ public int hashCode()
+ {
+ int hash = _mcfProperties.hashCode();
+ hash += 31 * _ra.hashCode();
+
+ return hash;
+ }
+
+ /**
+ * Get the default session type
+ *
+ * @return The value
+ */
+ public String getSessionDefaultType()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSessionDefaultType()");
+ }
+
+ return _mcfProperties.getSessionDefaultType();
+ }
+
+ /**
+ * Set the default session type
+ *
+ * @param type either javax.jms.Topic or javax.jms.Queue
+ */
+ public void setSessionDefaultType(final String type)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSessionDefaultType(" + type + ")");
+ }
+
+ _mcfProperties.setSessionDefaultType(type);
+ }
+
+ public String getClientID()
+ {
+ return _mcfProperties.getClientId();
+ }
+
+ public void setClientID(final String clientID)
+ {
+ _mcfProperties.setClientId(clientID);
+ }
+
+ public String getConnectionURL()
+ {
+ return _mcfProperties.getConnectionURL() ;
+ }
+
+ public void setConnectionURL(final String connectionURL)
+ {
+ _mcfProperties.setConnectionURL(connectionURL);
+ }
+
+ public String getPassword()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDefaultPassword()");
+ }
+ return _mcfProperties.getPassword();
+ }
+
+ public void setPassword(final String defaultPassword)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDefaultPassword(" + defaultPassword + ")");
+ }
+ _mcfProperties.setPassword(defaultPassword);
+ }
+
+ public String getUserName()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDefaultUsername()");
+ }
+ return _mcfProperties.getUserName();
+ }
+
+ public void setUserName(final String defaultUsername)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDefaultUsername(" + defaultUsername + ")");
+ }
+ _mcfProperties.setUserName(defaultUsername);
+ }
+
+ public String getHost()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getHost()");
+ }
+ return _mcfProperties.getHost();
+ }
+
+ public void setHost(final String host)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setHost(" + host + ")");
+ }
+ _mcfProperties.setHost(host);
+ }
+
+ public Integer getPort()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPort()");
+ }
+ return _mcfProperties.getPort();
+ }
+
+ public void setPort(final Integer port)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPort(" + port + ")");
+ }
+ _mcfProperties.setPort(port);
+ }
+
+ public String getPath()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPath()");
+ }
+ return _mcfProperties.getPath();
+ }
+
+ public void setPath(final String path)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPath(" + path + ")");
+ }
+ _mcfProperties.setPath(path);
+ }
+
+ /**
+ * Get the useTryLock.
+ *
+ * @return the useTryLock.
+ */
+ public Integer getUseTryLock()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUseTryLock()");
+ }
+
+ return _mcfProperties.getUseTryLock();
+ }
+
+ /**
+ * Set the useTryLock.
+ *
+ * @param useTryLock the useTryLock.
+ */
+ public void setUseTryLock(final Integer useTryLock)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUseTryLock(" + useTryLock + ")");
+ }
+
+ _mcfProperties.setUseTryLock(useTryLock);
+ }
+
+ /**
+ * Get the connection metadata
+ *
+ * @return The metadata
+ */
+ public ConnectionMetaData getMetaData()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMetadata()");
+ }
+
+ return new QpidRAConnectionMetaData();
+ }
+
+ /**
+ * Get the default connection factory
+ *
+ * @return The factory
+ */
+ protected synchronized AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
+ {
+ if (_connectionFactory == null)
+ {
+ try
+ {
+ _connectionFactory = _ra.createAMQConnectionFactory(_mcfProperties);
+ }
+ catch (final QpidRAException qpidrae)
+ {
+ throw new ResourceException("Unexpected exception creating the connection factory", qpidrae) ;
+ }
+ }
+ return _connectionFactory;
+ }
+
+ /**
+ * Get a clean connection factory
+ *
+ * @return The factory
+ */
+ protected AMQConnectionFactory getCleanAMQConnectionFactory() throws ResourceException
+ {
+ try
+ {
+ return _ra.createAMQConnectionFactory(_mcfProperties);
+ }
+ catch (final QpidRAException qpidrae)
+ {
+ throw new ResourceException("Unexpected exception creating the connection factory", qpidrae) ;
+ }
+ }
+
+ /**
+ * Get the managed connection factory properties
+ *
+ * @return The properties
+ */
+ protected QpidRAMCFProperties getProperties()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getProperties()");
+ }
+
+ return _mcfProperties;
+ }
+
+ /**
+ * Get a connection request info instance
+ *
+ * @param info The instance that should be updated; may be <code>null</code>
+ * @return The instance
+ * @throws ResourceException
+ */
+ private QpidRAConnectionRequestInfo getCRI(final QpidRAConnectionRequestInfo info)
+ throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getCRI(" + info + ")");
+ }
+
+ if (info == null)
+ {
+ // Create a default one
+ return new QpidRAConnectionRequestInfo(_ra, _mcfProperties.getType());
+ }
+ else
+ {
+ // Fill the one with any defaults
+ info.setDefaults(_ra);
+ return info;
+ }
+ }
+
+ public Boolean getUseLocalTx()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUseLocalTx()");
+ }
+
+ return _mcfProperties.isUseLocalTx();
+ }
+
+ public void setUseLocalTx(final Boolean localTx)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUseLocalTx(" + localTx + ")");
+ }
+
+ _mcfProperties.setUseLocalTx(localTx);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,457 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAMapMessage extends QpidRAMessage implements MapMessage
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMapMessage.class);
+
+ /**
+ * Create a new wrapper
+ *
+ * @param message the message
+ * @param session the session
+ */
+ public QpidRAMapMessage(final MapMessage message, final QpidRASessionImpl session)
+ {
+ super(message, session);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + message + ", " + session + ")");
+ }
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getBoolean(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getBoolean(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getBoolean(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public byte getByte(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getByte(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getByte(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public byte[] getBytes(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getBytes(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getBytes(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public char getChar(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getChar(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getChar(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public double getDouble(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDouble(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getDouble(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public float getFloat(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getFloat(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getFloat(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getInt(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getInt(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getInt(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long getLong(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getLong(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getLong(name);
+ }
+
+ /**
+ * Get the map names
+ * @return The values
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Enumeration<?> getMapNames() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMapNames()");
+ }
+
+ return ((MapMessage)_message).getMapNames();
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Object getObject(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getObject(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getObject(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public short getShort(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getShort(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getShort(name);
+ }
+
+ /**
+ * Get
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getString(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getString(" + name + ")");
+ }
+
+ return ((MapMessage)_message).getString(name);
+ }
+
+ /**
+ * Does the item exist
+ * @param name The name
+ * @return True / false
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean itemExists(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("itemExists(" + name + ")");
+ }
+
+ return ((MapMessage)_message).itemExists(name);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setBoolean(final String name, final boolean value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setBoolean(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setBoolean(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setByte(final String name, final byte value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setByte(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setByte(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @param offset The offset
+ * @param length The length
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setBytes(" + name + ", " + value + ", " + offset + ", " + length + ")");
+ }
+
+ ((MapMessage)_message).setBytes(name, value, offset, length);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setBytes(final String name, final byte[] value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setBytes(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setBytes(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setChar(final String name, final char value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setChar(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setChar(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setDouble(final String name, final double value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDouble(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setDouble(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setFloat(final String name, final float value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setFloat(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setFloat(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setInt(final String name, final int value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setInt(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setInt(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setLong(final String name, final long value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setLong(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setLong(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setObject(final String name, final Object value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setObject(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setObject(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setShort(final String name, final short value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setShort(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setShort(name, value);
+ }
+
+ /**
+ * Set
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setString(final String name, final String value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setString(" + name + ", " + value + ")");
+ }
+
+ ((MapMessage)_message).setString(name, value);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,782 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ */
+public class QpidRAMessage implements Message
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMessage.class);
+
+ /** The message */
+ protected Message _message;
+
+ /** The session */
+ protected QpidRASessionImpl _session;
+
+ /**
+ * Create a new wrapper
+ * @param message the message
+ * @param session the session
+ */
+ public QpidRAMessage(final Message message, final QpidRASessionImpl session)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+ }
+
+ this._message = message;
+ this._session = session;
+ }
+
+ /**
+ * Acknowledge
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void acknowledge() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("acknowledge()");
+ }
+
+ _session.getSession(); // Check for closed
+ _message.acknowledge();
+ }
+
+ /**
+ * Clear body
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void clearBody() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("clearBody()");
+ }
+
+ _message.clearBody();
+ }
+
+ /**
+ * Clear properties
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void clearProperties() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("clearProperties()");
+ }
+
+ _message.clearProperties();
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getBooleanProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getBooleanProperty(" + name + ")");
+ }
+
+ return _message.getBooleanProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public byte getByteProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getByteProperty(" + name + ")");
+ }
+
+ return _message.getByteProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public double getDoubleProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDoubleProperty(" + name + ")");
+ }
+
+ return _message.getDoubleProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public float getFloatProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getFloatProperty(" + name + ")");
+ }
+
+ return _message.getFloatProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getIntProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getIntProperty(" + name + ")");
+ }
+
+ return _message.getIntProperty(name);
+ }
+
+ /**
+ * Get correlation id
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getJMSCorrelationID() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSCorrelationID()");
+ }
+
+ return _message.getJMSCorrelationID();
+ }
+
+ /**
+ * Get correlation id
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSCorrelationIDAsBytes()");
+ }
+
+ return _message.getJMSCorrelationIDAsBytes();
+ }
+
+ /**
+ * Get delivery mode
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSDeliveryMode()");
+ }
+
+ return _message.getJMSDeliveryMode();
+ }
+
+ /**
+ * Get destination
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Destination getJMSDestination() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSDestination()");
+ }
+
+ return _message.getJMSDestination();
+ }
+
+ /**
+ * Get expiration
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long getJMSExpiration() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSExpiration()");
+ }
+
+ return _message.getJMSExpiration();
+ }
+
+ /**
+ * Get message id
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getJMSMessageID() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSMessageID()");
+ }
+
+ return _message.getJMSMessageID();
+ }
+
+ /**
+ * Get priority
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public int getJMSPriority() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSPriority()");
+ }
+
+ return _message.getJMSPriority();
+ }
+
+ /**
+ * Get redelivered status
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSRedelivered()");
+ }
+
+ return _message.getJMSRedelivered();
+ }
+
+ /**
+ * Get reply to destination
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSReplyTo()");
+ }
+
+ return _message.getJMSReplyTo();
+ }
+
+ /**
+ * Get timestamp
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long getJMSTimestamp() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSTimestamp()");
+ }
+
+ return _message.getJMSTimestamp();
+ }
+
+ /**
+ * Get type
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getJMSType() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getJMSType()");
+ }
+
+ return _message.getJMSType();
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public long getLongProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getLongProperty(" + name + ")");
+ }
+
+ return _message.getLongProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Object getObjectProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getObjectProperty(" + name + ")");
+ }
+
+ return _message.getObjectProperty(name);
+ }
+
+ /**
+ * Get property names
+ * @return The values
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Enumeration<?> getPropertyNames() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPropertyNames()");
+ }
+
+ return _message.getPropertyNames();
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public short getShortProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getShortProperty(" + name + ")");
+ }
+
+ return _message.getShortProperty(name);
+ }
+
+ /**
+ * Get property
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getStringProperty(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getStringProperty(" + name + ")");
+ }
+
+ return _message.getStringProperty(name);
+ }
+
+ /**
+ * Do property exist
+ * @param name The name
+ * @return The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public boolean propertyExists(final String name) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("propertyExists(" + name + ")");
+ }
+
+ return _message.propertyExists(name);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setBooleanProperty(final String name, final boolean value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setBooleanProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setBooleanProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setByteProperty(final String name, final byte value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setByteProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setByteProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setDoubleProperty(final String name, final double value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDoubleProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setDoubleProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setFloatProperty(final String name, final float value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setFloatProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setFloatProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setIntProperty(final String name, final int value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setIntProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setIntProperty(name, value);
+ }
+
+ /**
+ * Set correlation id
+ * @param correlationID The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSCorrelationID(final String correlationID) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSCorrelationID(" + correlationID + ")");
+ }
+
+ _message.setJMSCorrelationID(correlationID);
+ }
+
+ /**
+ * Set correlation id
+ * @param correlationID The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSCorrelationIDAsBytes(" + correlationID + ")");
+ }
+
+ _message.setJMSCorrelationIDAsBytes(correlationID);
+ }
+
+ /**
+ * Set delivery mode
+ * @param deliveryMode The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSDeliveryMode(final int deliveryMode) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSDeliveryMode(" + deliveryMode + ")");
+ }
+
+ _message.setJMSDeliveryMode(deliveryMode);
+ }
+
+ /**
+ * Set destination
+ * @param destination The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSDestination(final Destination destination) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSDestination(" + destination + ")");
+ }
+
+ _message.setJMSDestination(destination);
+ }
+
+ /**
+ * Set expiration
+ * @param expiration The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSExpiration(final long expiration) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSExpiration(" + expiration + ")");
+ }
+
+ _message.setJMSExpiration(expiration);
+ }
+
+ /**
+ * Set message id
+ * @param id The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSMessageID(final String id) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSMessageID(" + id + ")");
+ }
+
+ _message.setJMSMessageID(id);
+ }
+
+ /**
+ * Set priority
+ * @param priority The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSPriority(final int priority) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSPriority(" + priority + ")");
+ }
+
+ _message.setJMSPriority(priority);
+ }
+
+ /**
+ * Set redelivered status
+ * @param redelivered The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSRedelivered(final boolean redelivered) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSRedelivered(" + redelivered + ")");
+ }
+
+ _message.setJMSRedelivered(redelivered);
+ }
+
+ /**
+ * Set reply to
+ * @param replyTo The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSReplyTo(final Destination replyTo) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSReplyTo(" + replyTo + ")");
+ }
+
+ _message.setJMSReplyTo(replyTo);
+ }
+
+ /**
+ * Set timestamp
+ * @param timestamp The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSTimestamp(final long timestamp) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSTimestamp(" + timestamp + ")");
+ }
+
+ _message.setJMSTimestamp(timestamp);
+ }
+
+ /**
+ * Set type
+ * @param type The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setJMSType(final String type) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setJMSType(" + type + ")");
+ }
+
+ _message.setJMSType(type);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setLongProperty(final String name, final long value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setLongProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setLongProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setObjectProperty(final String name, final Object value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setObjectProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setObjectProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setShortProperty(final String name, final short value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setShortProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setShortProperty(name, value);
+ }
+
+ /**
+ * Set property
+ * @param name The name
+ * @param value The value
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setStringProperty(final String name, final String value) throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setStringProperty(" + name + ", " + value + ")");
+ }
+
+ _message.setStringProperty(name, value);
+ }
+
+ /**
+ * Return the hash code
+ * @return The hash code
+ */
+ @Override
+ public int hashCode()
+ {
+ return _message.hashCode();
+ }
+
+ /**
+ * Check for equality
+ * @param object The other object
+ * @return True / false
+ */
+ @Override
+ public boolean equals(final Object object)
+ {
+ if (object != null && object instanceof QpidRAMessage)
+ {
+ return _message.equals(((QpidRAMessage)object)._message);
+ }
+ else
+ {
+ return _message.equals(object);
+ }
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,353 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message consumer
+ *
+ */
+public class QpidRAMessageConsumer implements MessageConsumer
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageConsumer.class);
+
+ /** The wrapped message consumer */
+ protected MessageConsumer _consumer;
+ /** The closed flag */
+ private AtomicBoolean _closed = new AtomicBoolean();
+
+ /** The session for this consumer */
+ protected QpidRASessionImpl _session;
+
+ /**
+ * Create a new wrapper
+ * @param consumer the consumer
+ * @param session the session
+ */
+ public QpidRAMessageConsumer(final MessageConsumer consumer, final QpidRASessionImpl session)
+ {
+ this._consumer = consumer;
+ this._session = session;
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("new QpidRAMessageConsumer " + this +
+ " consumer=" +
+ Util.asString(consumer) +
+ " session=" +
+ session);
+ }
+ }
+
+ /**
+ * Close
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void close() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("close " + this);
+ }
+ try
+ {
+ closeConsumer();
+ }
+ finally
+ {
+ _session.removeConsumer(this);
+ }
+ }
+
+ /**
+ * Check state
+ * @exception JMSException Thrown if an error occurs
+ */
+ void checkState() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("checkState()");
+ }
+ _session.checkState();
+ }
+
+ /**
+ * Get message listener
+ * @return The listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageListener()");
+ }
+
+ checkState();
+ _session.checkStrict();
+ return _consumer.getMessageListener();
+ }
+
+ /**
+ * Set message listener
+ * @param listener The listener
+ * @exception JMSException Thrown if an error occurs
+ */
+ public void setMessageListener(final MessageListener listener) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ checkState();
+ _session.checkStrict();
+ if (listener == null)
+ {
+ _consumer.setMessageListener(null);
+ }
+ else
+ {
+ _consumer.setMessageListener(wrapMessageListener(listener));
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Get message selector
+ * @return The selector
+ * @exception JMSException Thrown if an error occurs
+ */
+ public String getMessageSelector() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageSelector()");
+ }
+
+ checkState();
+ return _consumer.getMessageSelector();
+ }
+
+ /**
+ * Receive
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Message receive() throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("receive " + this);
+ }
+
+ checkState();
+ // Make an explicit start check otherwise qpid starts the dispatcher
+ Message message = (_session.isStarted() ? _consumer.receive() : null);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("received " + this + " result=" + Util.asString(message));
+ }
+
+ if (message == null)
+ {
+ return null;
+ }
+ else
+ {
+ return wrapMessage(message);
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Receive
+ * @param timeout The timeout value
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Message receive(final long timeout) throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("receive " + this + " timeout=" + timeout);
+ }
+
+ checkState();
+ // Make an explicit start check otherwise qpid starts the dispatcher
+ Message message = (_session.isStarted() ? _consumer.receive(timeout) : null);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("received " + this + " result=" + Util.asString(message));
+ }
+
+ if (message == null)
+ {
+ return null;
+ }
+ else
+ {
+ return wrapMessage(message);
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Receive
+ * @return The message
+ * @exception JMSException Thrown if an error occurs
+ */
+ public Message receiveNoWait() throws JMSException
+ {
+ _session.lock();
+ try
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("receiveNoWait " + this);
+ }
+
+ checkState();
+ // Make an explicit start check otherwise qpid starts the dispatcher
+ Message message = (_session.isStarted() ? _consumer.receiveNoWait() : null);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("received " + this + " result=" + Util.asString(message));
+ }
+
+ if (message == null)
+ {
+ return null;
+ }
+ else
+ {
+ return wrapMessage(message);
+ }
+ }
+ finally
+ {
+ _session.unlock();
+ }
+ }
+
+ /**
+ * Close consumer
+ * @exception JMSException Thrown if an error occurs
+ */
+ void closeConsumer() throws JMSException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("closeConsumer()");
+ }
+
+ if (!_closed.getAndSet(true))
+ {
+ _consumer.close();
+ }
+ }
+
+ /**
+ * Wrap message
+ * @param message The message to be wrapped
+ * @return The wrapped message
+ */
+ Message wrapMessage(final Message message)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("wrapMessage(" + Util.asString(message) + ")");
+ }
+
+ if (message instanceof BytesMessage)
+ {
+ return new QpidRABytesMessage((BytesMessage)message, _session);
+ }
+ else if (message instanceof MapMessage)
+ {
+ return new QpidRAMapMessage((MapMessage)message, _session);
+ }
+ else if (message instanceof ObjectMessage)
+ {
+ return new QpidRAObjectMessage((ObjectMessage)message, _session);
+ }
+ else if (message instanceof StreamMessage)
+ {
+ return new QpidRAStreamMessage((StreamMessage)message, _session);
+ }
+ else if (message instanceof TextMessage)
+ {
+ return new QpidRATextMessage((TextMessage)message, _session);
+ }
+ return new QpidRAMessage(message, _session);
+ }
+
+ /**
+ * Wrap message listener
+ * @param listener The listener to be wrapped
+ * @return The wrapped listener
+ */
+ MessageListener wrapMessageListener(final MessageListener listener)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageSelector()");
+ }
+
+ return new QpidRAMessageListener(listener, this);
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message listener
+ */
+public class QpidRAMessageListener implements MessageListener
+{
+ /** The logger */
+ private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageListener.class);
+
+ /** The message listener */
+ private final MessageListener _listener;
+
+ /** The consumer */
+ private final QpidRAMessageConsumer _consumer;
+
+ /**
+ * Create a new wrapper
+ * @param listener the listener
+ * @param consumer the consumer
+ */
+ public QpidRAMessageListener(final MessageListener listener, final QpidRAMessageConsumer consumer)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + listener + ", " + consumer + ")");
+ }
+
+ this._listener = listener;
+ this._consumer = consumer;
+ }
+
+ /**
+ * On message
+ * @param message The message
+ */
+ public void onMessage(Message message)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("onMessage(" + Util.asString(message) + ")");
+ }
+
+ message = _consumer.wrapMessage(message);
+ _listener.onMessage(message);
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org