You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC

svn commit: r1220336 [4/8] - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ jca/ jca/example/ jca/example/conf/ jca/example/src/ jca/example/src/main/ jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,880 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+import javax.jms.XAQueueConnection;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ConnectionEventListener;
+import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.IllegalStateException;
+import javax.resource.spi.LocalTransaction;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.ManagedConnectionMetaData;
+import javax.resource.spi.SecurityException;
+import javax.security.auth.Subject;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The managed connection
+ *
+ */
+public class QpidRAManagedConnection implements ManagedConnection, ExceptionListener
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAManagedConnection.class);
+
+   /** The managed connection factory */
+   private final QpidRAManagedConnectionFactory _mcf;
+
+   /** The connection request information */
+   private final QpidRAConnectionRequestInfo _cri;
+
+   /** The user name */
+   private final String _userName;
+
+   /** The password */
+   private final String _password;
+
+   /** Has the connection been destroyed */
+   private final AtomicBoolean _isDestroyed = new AtomicBoolean(false);
+
+   /** Event listeners */
+   private final List<ConnectionEventListener> _eventListeners;
+
+   /** Handles */
+   private final Set<QpidRASessionImpl> _handles;
+
+   /** Lock */
+   private ReentrantLock _lock = new ReentrantLock();
+
+   // Physical JMS connection stuff
+   private Connection _connection;
+
+   private XASession _xaSession;
+
+   private XAResource _xaResource;
+
+   private Session _session;
+
+   private final TransactionManager _tm;
+
+   private boolean _inManagedTx;
+
+   /**
+    * Constructor
+    * @param mcf The managed connection factory
+    * @param cri The connection request information
+    * @param userName The user name
+    * @param password The password
+    */
+   public QpidRAManagedConnection(final QpidRAManagedConnectionFactory mcf,
+                                     final QpidRAConnectionRequestInfo cri,
+                                     final TransactionManager tm,
+                                     final String userName,
+                                     final String password) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + mcf + ", " + cri + ", " + userName + ", ****)");
+      }
+
+      this._mcf = mcf;
+      this._cri = cri;
+      this._tm = tm;
+      this._userName = userName;
+      this._password = password;
+      _eventListeners = Collections.synchronizedList(new ArrayList<ConnectionEventListener>());
+      _handles = Collections.synchronizedSet(new HashSet<QpidRASessionImpl>());
+
+      try
+      {
+         setup();
+      }
+      catch (Throwable t)
+      {
+         try
+         {
+            destroy();
+         }
+         catch (Throwable ignored)
+         {
+         }
+         throw new ResourceException("Error during setup", t);
+      }
+   }
+
+   /**
+    * Get a connection
+    * @param subject The security subject
+    * @param cxRequestInfo The request info
+    * @return The connection
+    * @exception ResourceException Thrown if an error occurs
+    */
+   public synchronized Object getConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getConnection(" + subject + ", " + cxRequestInfo + ")");
+      }
+
+      // Check user first
+      QpidRACredential credential = QpidRACredential.getCredential(_mcf, subject, cxRequestInfo);
+
+      // Null users are allowed!
+      if (_userName != null && !_userName.equals(credential.getUserName()))
+      {
+         throw new SecurityException("Password credentials not the same, reauthentication not allowed");
+      }
+
+      if (_userName == null && credential.getUserName() != null)
+      {
+         throw new SecurityException("Password credentials not the same, reauthentication not allowed");
+      }
+
+      if (_isDestroyed.get())
+      {
+         throw new IllegalStateException("The managed connection is already destroyed");
+      }
+
+      QpidRASessionImpl session = new QpidRASessionImpl(this, (QpidRAConnectionRequestInfo)cxRequestInfo);
+      _handles.add(session);
+      return session;
+   }
+
+   /**
+    * Destroy all handles.
+    * @exception ResourceException Failed to close one or more handles.
+    */
+   private void destroyHandles() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("destroyHandles()");
+      }
+
+      try
+      {
+         if (_connection != null)
+         {
+            _connection.stop();
+         }
+      }
+      catch (Throwable t)
+      {
+         _log.trace("Ignored error stopping connection", t);
+      }
+
+      for (QpidRASessionImpl session : _handles)
+      {
+         session.destroy();
+      }
+
+      _handles.clear();
+   }
+
+   /**
+    * Destroy the physical connection.
+    * @exception ResourceException Could not property close the session and connection.
+    */
+   public void destroy() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("destroy()");
+      }
+
+      if (_isDestroyed.get() ||  _connection == null)
+      {
+         return;
+      }
+
+      _isDestroyed.set(true);
+
+      try
+      {
+         _connection.setExceptionListener(null);
+      }
+      catch (JMSException e)
+      {
+         _log.debug("Error unsetting the exception listener " + this, e);
+      }
+
+      destroyHandles();
+
+      try
+      {
+         try
+         {
+            if (_xaSession != null)
+            {
+               _xaSession.close();
+            }
+         }
+         catch (JMSException e)
+         {
+            _log.debug("Error closing session " + this, e);
+         }
+
+         if (_connection != null)
+         {
+            _connection.close();
+         }
+      }
+      catch (Throwable e)
+      {
+         throw new ResourceException("Could not properly close the session and connection", e);
+      }
+   }
+
+   /**
+    * Cleanup
+    * @exception ResourceException Thrown if an error occurs
+    */
+   public void cleanup() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("cleanup()");
+      }
+
+      if (_isDestroyed.get())
+      {
+         throw new IllegalStateException("ManagedConnection already destroyed");
+      }
+
+      destroyHandles();
+
+      _inManagedTx = false;
+
+      // I'm recreating the lock object when we return to the pool
+      // because it looks too nasty to expect the connection handle
+      // to unlock properly in certain race conditions
+      // where the dissociation of the managed connection is "random".
+      _lock = new ReentrantLock();
+   }
+
+   /**
+    * Move a handler from one mc to this one.
+    * @param obj An object of type QpidRASession.
+    * @throws ResourceException Failed to associate connection.
+    * @throws IllegalStateException ManagedConnection in an illegal state.
+    */
+   public void associateConnection(final Object obj) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("associateConnection(" + obj + ")");
+      }
+
+      if (!_isDestroyed.get() && obj instanceof QpidRASessionImpl)
+      {
+         QpidRASessionImpl h = (QpidRASessionImpl)obj;
+         h.setManagedConnection(this);
+         _handles.add(h);
+      }
+      else
+      {
+         throw new IllegalStateException("ManagedConnection in an illegal state");
+      }
+   }
+
+   public void checkTransactionActive() throws JMSException
+   {
+      // don't bother looking at the transaction if there's an active XID
+      if (!_inManagedTx && _tm != null)
+      {
+         try
+         {
+            Transaction tx = _tm.getTransaction();
+            if (tx != null)
+            {
+               int status = tx.getStatus();
+               // Only allow states that will actually succeed
+               if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING &&
+                   status != Status.STATUS_PREPARED &&
+                   status != Status.STATUS_COMMITTING)
+               {
+                  throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
+               }
+            }
+         }
+         catch (SystemException e)
+         {
+            JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
+            jmsE.initCause(e);
+            throw jmsE;
+         }
+      }
+   }
+
+
+   /**
+    * Aqquire a lock on the managed connection
+    */
+   protected void lock()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("lock()");
+      }
+
+      _lock.lock();
+   }
+
+   /**
+    * Aqquire a lock on the managed connection within the specified period
+    * @exception JMSException Thrown if an error occurs
+    */
+   protected void tryLock() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("tryLock()");
+      }
+
+      Integer tryLock = _mcf.getUseTryLock();
+      if (tryLock == null || tryLock.intValue() <= 0)
+      {
+         lock();
+         return;
+      }
+      try
+      {
+         if (_lock.tryLock(tryLock.intValue(), TimeUnit.SECONDS) == false)
+         {
+            throw new ResourceAllocationException("Unable to obtain lock in " + tryLock + " seconds: " + this);
+         }
+      }
+      catch (InterruptedException e)
+      {
+         throw new ResourceAllocationException("Interrupted attempting lock: " + this);
+      }
+   }
+
+   /**
+    * Unlock the managed connection
+    */
+   protected void unlock()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("unlock()");
+      }
+
+      if (_lock.isHeldByCurrentThread())
+      {
+         _lock.unlock();
+      }
+   }
+
+   /**
+    * Add a connection event listener.
+    * @param l The connection event listener to be added.
+    */
+   public void addConnectionEventListener(final ConnectionEventListener l)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("addConnectionEventListener(" + l + ")");
+      }
+
+      _eventListeners.add(l);
+   }
+
+   /**
+    * Remove a connection event listener.
+    * @param l The connection event listener to be removed.
+    */
+   public void removeConnectionEventListener(final ConnectionEventListener l)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("removeConnectionEventListener(" + l + ")");
+      }
+
+      _eventListeners.remove(l);
+   }
+
+   /**
+    * Get the XAResource for the connection.
+    * @return The XAResource for the connection.
+    * @exception ResourceException XA transaction not supported
+    */
+   public XAResource getXAResource() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getXAResource()");
+      }
+
+      //
+      // Spec says a mc must allways return the same XA resource,
+      // so we cache it.
+      //
+      if (_xaResource == null)
+      {
+            _xaResource = new QpidRAXAResource(this, _xaSession.getXAResource());
+      }
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("XAResource=" + _xaResource);
+      }
+
+      return _xaResource;
+   }
+
+   /**
+    * Get the location transaction for the connection.
+    * @return The local transaction for the connection.
+    * @exception ResourceException Thrown if operation fails.
+    */
+   public LocalTransaction getLocalTransaction() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getLocalTransaction()");
+      }
+
+      LocalTransaction tx = new QpidRALocalTransaction(this);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("LocalTransaction=" + tx);
+      }
+
+      return tx;
+   }
+
+   /**
+    * Get the meta data for the connection.
+    * @return The meta data for the connection.
+    * @exception ResourceException Thrown if the operation fails.
+    * @exception IllegalStateException Thrown if the managed connection already is destroyed.
+    */
+   public ManagedConnectionMetaData getMetaData() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMetaData()");
+      }
+
+      if (_isDestroyed.get())
+      {
+         throw new IllegalStateException("The managed connection is already destroyed");
+      }
+
+      return new QpidRAMetaData(this);
+   }
+
+   /**
+    * Set the log writer -- NOT SUPPORTED
+    * @param out The log writer
+    * @exception ResourceException If operation fails
+    */
+   public void setLogWriter(final PrintWriter out) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setLogWriter(" + out + ")");
+      }
+   }
+
+   /**
+    * Get the log writer -- NOT SUPPORTED
+    * @return Always null
+    * @exception ResourceException If operation fails
+    */
+   public PrintWriter getLogWriter() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getLogWriter()");
+      }
+
+      return null;
+   }
+
+   /**
+    * Notifies user of a JMS exception.
+    * @param exception The JMS exception
+    */
+   public void onException(final JMSException exception)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("onException(" + exception + ")");
+      }
+
+      if (_isDestroyed.get())
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("Ignoring error on already destroyed connection " + this, exception);
+         }
+         return;
+      }
+
+      _log.warn("Handling JMS exception failure: " + this, exception);
+
+      try
+      {
+         _connection.setExceptionListener(null);
+      }
+      catch (JMSException e)
+      {
+         _log.debug("Unable to unset exception listener", e);
+      }
+
+      ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
+      sendEvent(event);
+   }
+
+   /**
+    * Get the session for this connection.
+    * @return The session
+    * @throws JMSException
+    */
+   protected Session getSession() throws JMSException
+   {
+      if(_xaSession != null && !_mcf.getUseLocalTx())
+      {
+          if (_log.isTraceEnabled())
+          {
+              _log.trace("getSession() -> XA session " + Util.asString(_xaSession));
+          }
+
+          return _xaSession;
+      }
+      else
+      {
+          if (_log.isTraceEnabled())
+          {
+              _log.trace("getSession() -> session " + Util.asString(_session));
+          }
+
+          return _session;
+      }
+   }
+
+   /**
+    * Send an event.
+    * @param event The event to send.
+    */
+   protected void sendEvent(final ConnectionEvent event)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("sendEvent(" + event + ")");
+      }
+
+      int type = event.getId();
+
+      // convert to an array to avoid concurrent modification exceptions
+      ConnectionEventListener[] list = _eventListeners.toArray(new ConnectionEventListener[_eventListeners.size()]);
+
+      for (ConnectionEventListener l : list)
+      {
+         switch (type)
+         {
+            case ConnectionEvent.CONNECTION_CLOSED:
+               l.connectionClosed(event);
+               break;
+
+            case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
+               l.localTransactionStarted(event);
+               break;
+
+            case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
+               l.localTransactionCommitted(event);
+               break;
+
+            case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
+               l.localTransactionRolledback(event);
+               break;
+
+            case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
+               l.connectionErrorOccurred(event);
+               break;
+
+            default:
+               throw new IllegalArgumentException("Illegal eventType: " + type);
+         }
+      }
+   }
+
+   /**
+    * Remove a handle from the handle map.
+    * @param handle The handle to remove.
+    */
+   protected void removeHandle(final QpidRASessionImpl handle)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("removeHandle(" + handle + ")");
+      }
+
+      _handles.remove(handle);
+   }
+
+   /**
+    * Get the request info for this connection.
+    * @return The connection request info for this connection.
+    */
+   protected QpidRAConnectionRequestInfo getCRI()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getCRI()");
+      }
+
+      return _cri;
+   }
+
+   /**
+    * Get the connection factory for this connection.
+    * @return The connection factory for this connection.
+    */
+   protected QpidRAManagedConnectionFactory getManagedConnectionFactory()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getManagedConnectionFactory()");
+      }
+
+      return _mcf;
+   }
+
+   /**
+    * Start the connection
+    * @exception JMSException Thrown if the connection cant be started
+    */
+   void start() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start()");
+      }
+
+      if (_connection != null)
+      {
+         _connection.start();
+      }
+   }
+
+   /**
+    * Stop the connection
+    * @exception JMSException Thrown if the connection cant be stopped
+    */
+   void stop() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("stop()");
+      }
+
+      if (_connection != null)
+      {
+         _connection.stop();
+      }
+   }
+
+   /**
+    * Get the user name
+    * @return The user name
+    */
+   protected String getUserName()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUserName()");
+      }
+
+      return _userName;
+   }
+
+   /**
+    * Setup the connection.
+    * @exception ResourceException Thrown if a connection couldnt be created
+    */
+   private void setup() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setup()");
+      }
+
+      try
+      {
+         boolean transacted = _cri.isTransacted();
+         int acknowledgeMode =  Session.AUTO_ACKNOWLEDGE;
+         boolean localTx = _mcf.getUseLocalTx();
+
+         if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
+         {
+            if (_userName != null && _password != null)
+            {
+               if(!localTx)
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password);
+               }
+               else
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection();
+               }
+            }
+            else
+            {
+               if(!localTx)
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection();
+               }
+               else
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection();
+               }
+            }
+
+            if(!localTx)
+            {
+                _xaSession = ((XATopicConnection)_connection).createXATopicSession();
+
+            }
+            else
+            {
+                _session =  ((TopicConnection)_connection).createTopicSession(localTx, acknowledgeMode);
+            }
+         }
+         else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION)
+         {
+            if (_userName != null && _password != null)
+            {
+               if(!localTx)
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password);
+               }
+               else
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection();
+               }
+            }
+            else
+            {
+               if(!localTx)
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection();
+               }
+               else
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection();
+               }
+            }
+
+            if(!localTx)
+            {
+                _xaSession = ((XAQueueConnection)_connection).createXAQueueSession();
+
+            }
+            else
+            {
+               _session =  ((QueueConnection)_connection).createQueueSession(localTx, acknowledgeMode);
+
+            }
+         }
+         else
+         {
+            if (_userName != null && _password != null)
+            {
+               if(!localTx)
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password);
+               }
+               else
+               {
+                    _connection = _mcf.getCleanAMQConnectionFactory().createConnection();
+               }
+            }
+            else
+            {
+               if(!localTx)
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection();
+               }
+               else
+               {
+                   _connection = _mcf.getDefaultAMQConnectionFactory().createConnection();
+               }
+            }
+
+            if(!localTx)
+            {
+                _xaSession = ((XAQueueConnection)_connection).createXASession();
+
+            }
+            else
+            {
+               _session =  ((QueueConnection)_connection).createSession(localTx, acknowledgeMode);
+
+            }
+         }
+
+        _connection.setExceptionListener(this);
+      }
+      catch (JMSException je)
+      {
+         throw new ResourceException(je.getMessage(), je);
+      }
+   }
+
+   protected void setInManagedTx(boolean inManagedTx)
+   {
+      this._inManagedTx = inManagedTx;
+   }
+
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnectionFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,623 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.io.PrintWriter;
+import java.util.Set;
+
+import javax.jms.ConnectionMetaData;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.ManagedConnectionFactory;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterAssociation;
+import javax.security.auth.Subject;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Qpid ManagedConectionFactory
+ *
+ */
+public class QpidRAManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation
+{
+   /**
+    * Serial version UID
+    */
+   private static final long serialVersionUID = -8798804592247643959L;
+
+   /**
+    * The logger
+    */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAManagedConnectionFactory.class);
+
+   /**
+    * The resource adapter
+    */
+   private QpidResourceAdapter _ra;
+
+   /**
+    * Connection manager
+    */
+   private ConnectionManager _cm;
+
+   /**
+    * The managed connection factory properties
+    */
+   private final QpidRAMCFProperties _mcfProperties;
+
+   /**
+    * Connection Factory used if properties are set
+    */
+   private AMQConnectionFactory _connectionFactory;
+
+   /**
+    * Constructor
+    */
+   public QpidRAManagedConnectionFactory()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor()");
+      }
+
+      _ra = null;
+      _cm = null;
+      _mcfProperties = new QpidRAMCFProperties();
+   }
+
+   /**
+    * Creates a Connection Factory instance
+    *
+    * @return javax.resource.cci.ConnectionFactory instance
+    * @throws ResourceException Thrown if a connection factory cant be created
+    */
+   public Object createConnectionFactory() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionFactory()");
+      }
+
+      return createConnectionFactory(new QpidRAConnectionManager());
+   }
+
+   /**
+    * Creates a Connection Factory instance
+    *
+    * @param cxManager The connection manager
+    * @return javax.resource.cci.ConnectionFactory instance
+    * @throws ResourceException Thrown if a connection factory cant be created
+    */
+   public Object createConnectionFactory(final ConnectionManager cxManager) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createConnectionFactory(" + cxManager + ")");
+      }
+
+      _cm = cxManager;
+
+      QpidRAConnectionFactory cf = new QpidRAConnectionFactoryImpl(this, _cm);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("Created connection factory: " + cf +
+                                                     ", using connection manager: " +
+                                                     _cm);
+      }
+
+      return cf;
+   }
+
+   /**
+    * Creates a new physical connection to the underlying EIS resource manager.
+    *
+    * @param subject       Caller's security information
+    * @param cxRequestInfo Additional resource adapter specific connection request information
+    * @return The managed connection
+    * @throws ResourceException Thrown if a managed connection cant be created
+    */
+   public ManagedConnection createManagedConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("createManagedConnection(" + subject + ", " + cxRequestInfo + ")");
+      }
+
+      QpidRAConnectionRequestInfo cri = getCRI((QpidRAConnectionRequestInfo)cxRequestInfo);
+
+      QpidRACredential credential = QpidRACredential.getCredential(this, subject, cri);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("jms credential: " + credential);
+      }
+
+      QpidRAManagedConnection mc = new QpidRAManagedConnection(this,
+                                                                     cri,
+                                                                     _ra.getTM(),
+                                                                     credential.getUserName(),
+                                                                     credential.getPassword());
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("created new managed connection: " + mc);
+      }
+
+      return mc;
+   }
+
+   /**
+    * Returns a matched connection from the candidate set of connections.
+    *
+    * @param connectionSet The candidate connection set
+    * @param subject       Caller's security information
+    * @param cxRequestInfo Additional resource adapter specific connection request information
+    * @return The managed connection
+    * @throws ResourceException Thrown if no managed connection can be found
+    */
+   @SuppressWarnings("rawtypes")
+   public ManagedConnection matchManagedConnections(final Set connectionSet,
+                                                    final Subject subject,
+                                                    final ConnectionRequestInfo cxRequestInfo) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("matchManagedConnections(" + connectionSet +
+                                                     ", " +
+                                                     subject +
+                                                     ", " +
+                                                     cxRequestInfo +
+                                                     ")");
+      }
+
+      QpidRAConnectionRequestInfo cri = getCRI((QpidRAConnectionRequestInfo)cxRequestInfo);
+      QpidRACredential credential = QpidRACredential.getCredential(this, subject, cri);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("Looking for connection matching credentials: " + credential);
+      }
+
+      for (final Object obj : connectionSet)
+      {
+         if (obj instanceof QpidRAManagedConnection)
+         {
+            QpidRAManagedConnection mc = (QpidRAManagedConnection)obj;
+            ManagedConnectionFactory mcf = mc.getManagedConnectionFactory();
+
+            if ((mc.getUserName() == null || mc.getUserName() != null && mc.getUserName()
+                                                                           .equals(credential.getUserName())) && mcf.equals(this))
+            {
+               if (cri.equals(mc.getCRI()))
+               {
+                  if (_log.isTraceEnabled())
+                  {
+                     _log.trace("Found matching connection: " + mc);
+                  }
+
+                  return mc;
+               }
+            }
+         }
+      }
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("No matching connection was found");
+      }
+
+      return null;
+   }
+
+   /**
+    * Set the log writer -- NOT SUPPORTED
+    *
+    * @param out The writer
+    * @throws ResourceException Thrown if the writer cant be set
+    */
+   public void setLogWriter(final PrintWriter out) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setLogWriter(" + out + ")");
+      }
+   }
+
+   /**
+    * Get the log writer -- NOT SUPPORTED
+    *
+    * @return The writer
+    * @throws ResourceException Thrown if the writer cant be retrieved
+    */
+   public PrintWriter getLogWriter() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getLogWriter()");
+      }
+
+      return null;
+   }
+
+   /**
+    * Get the resource adapter
+    *
+    * @return The resource adapter
+    */
+   public ResourceAdapter getResourceAdapter()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getResourceAdapter()");
+      }
+
+      return _ra;
+   }
+
+   /**
+    * Set the resource adapter
+    *
+    * @param ra The resource adapter
+    * @throws ResourceException Thrown if incorrect resource adapter
+    */
+   public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setResourceAdapter(" + ra + ")");
+      }
+
+      if (!(ra instanceof QpidResourceAdapter))
+      {
+         throw new ResourceException("Resource adapter is " + ra);
+      }
+
+      this._ra = (QpidResourceAdapter)ra;
+   }
+
+   /**
+    * Indicates whether some other object is "equal to" this one.
+    *
+    * @param obj Object with which to compare
+    * @return True if this object is the same as the obj argument; false otherwise.
+    */
+   @Override
+   public boolean equals(final Object obj)
+   {
+      if (obj instanceof QpidRAManagedConnectionFactory)
+      {
+         QpidRAManagedConnectionFactory other = (QpidRAManagedConnectionFactory)obj;
+
+         return _mcfProperties.equals(other.getProperties()) && _ra.equals(other.getResourceAdapter());
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   /**
+    * Return the hash code for the object
+    *
+    * @return The hash code
+    */
+   @Override
+   public int hashCode()
+   {
+      int hash = _mcfProperties.hashCode();
+      hash += 31 * _ra.hashCode();
+
+      return hash;
+   }
+
+   /**
+    * Get the default session type
+    *
+    * @return The value
+    */
+   public String getSessionDefaultType()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSessionDefaultType()");
+      }
+
+      return _mcfProperties.getSessionDefaultType();
+   }
+
+   /**
+    * Set the default session type
+    *
+    * @param type either javax.jms.Topic or javax.jms.Queue
+    */
+   public void setSessionDefaultType(final String type)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSessionDefaultType(" + type + ")");
+      }
+
+      _mcfProperties.setSessionDefaultType(type);
+   }
+
+   public String getClientID()
+   {
+      return _mcfProperties.getClientId();
+   }
+
+   public void setClientID(final String clientID)
+   {
+      _mcfProperties.setClientId(clientID);
+   }
+
+   public String getConnectionURL()
+   {
+      return _mcfProperties.getConnectionURL() ;
+   }
+
+   public void setConnectionURL(final String connectionURL)
+   {
+      _mcfProperties.setConnectionURL(connectionURL);
+   }
+
+   public String getPassword()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDefaultPassword()");
+      }
+      return _mcfProperties.getPassword();
+   }
+
+   public void setPassword(final String defaultPassword)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDefaultPassword(" + defaultPassword + ")");
+      }
+      _mcfProperties.setPassword(defaultPassword);
+   }
+
+   public String getUserName()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDefaultUsername()");
+      }
+      return _mcfProperties.getUserName();
+   }
+
+   public void setUserName(final String defaultUsername)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDefaultUsername(" + defaultUsername + ")");
+      }
+      _mcfProperties.setUserName(defaultUsername);
+   }
+
+   public String getHost()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getHost()");
+      }
+      return _mcfProperties.getHost();
+   }
+
+   public void setHost(final String host)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setHost(" + host + ")");
+      }
+      _mcfProperties.setHost(host);
+   }
+
+   public Integer getPort()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPort()");
+      }
+      return _mcfProperties.getPort();
+   }
+
+   public void setPort(final Integer port)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPort(" + port + ")");
+      }
+      _mcfProperties.setPort(port);
+   }
+
+   public String getPath()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPath()");
+      }
+      return _mcfProperties.getPath();
+   }
+
+   public void setPath(final String path)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPath(" + path + ")");
+      }
+      _mcfProperties.setPath(path);
+   }
+
+   /**
+    * Get the useTryLock.
+    *
+    * @return the useTryLock.
+    */
+   public Integer getUseTryLock()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUseTryLock()");
+      }
+
+      return _mcfProperties.getUseTryLock();
+   }
+
+   /**
+    * Set the useTryLock.
+    *
+    * @param useTryLock the useTryLock.
+    */
+   public void setUseTryLock(final Integer useTryLock)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUseTryLock(" + useTryLock + ")");
+      }
+
+      _mcfProperties.setUseTryLock(useTryLock);
+   }
+
+   /**
+    * Get the connection metadata
+    *
+    * @return The metadata
+    */
+   public ConnectionMetaData getMetaData()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMetadata()");
+      }
+
+      return new QpidRAConnectionMetaData();
+   }
+
+   /**
+    * Get the default connection factory
+    *
+    * @return The factory
+    */
+   protected synchronized AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
+   {
+      if (_connectionFactory == null)
+      {
+         try
+         {
+            _connectionFactory = _ra.createAMQConnectionFactory(_mcfProperties);
+         }
+         catch (final QpidRAException qpidrae)
+         {
+            throw new ResourceException("Unexpected exception creating the connection factory", qpidrae) ;
+         }
+      }
+      return _connectionFactory;
+   }
+
+   /**
+    * Get a clean connection factory
+    *
+    * @return The factory
+    */
+   protected AMQConnectionFactory getCleanAMQConnectionFactory() throws ResourceException
+   {
+      try
+      {
+         return _ra.createAMQConnectionFactory(_mcfProperties);
+      }
+      catch (final QpidRAException qpidrae)
+      {
+         throw new ResourceException("Unexpected exception creating the connection factory", qpidrae) ;
+      }
+   }
+
+   /**
+    * Get the managed connection factory properties
+    *
+    * @return The properties
+    */
+   protected QpidRAMCFProperties getProperties()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getProperties()");
+      }
+
+      return _mcfProperties;
+   }
+
+   /**
+    * Get a connection request info instance
+    *
+    * @param info The instance that should be updated; may be <code>null</code>
+    * @return The instance
+    * @throws ResourceException
+    */
+   private QpidRAConnectionRequestInfo getCRI(final QpidRAConnectionRequestInfo info)
+      throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getCRI(" + info + ")");
+      }
+
+      if (info == null)
+      {
+         // Create a default one
+         return new QpidRAConnectionRequestInfo(_ra, _mcfProperties.getType());
+      }
+      else
+      {
+         // Fill the one with any defaults
+         info.setDefaults(_ra);
+         return info;
+      }
+   }
+
+   public Boolean getUseLocalTx()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUseLocalTx()");
+      }
+
+      return _mcfProperties.isUseLocalTx();
+   }
+
+   public void setUseLocalTx(final Boolean localTx)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUseLocalTx(" + localTx + ")");
+      }
+
+      _mcfProperties.setUseLocalTx(localTx);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMapMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,457 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ *
+ */
+public class QpidRAMapMessage extends QpidRAMessage implements MapMessage
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMapMessage.class);
+
+   /**
+    * Create a new wrapper
+    *
+    * @param message the message
+    * @param session the session
+    */
+   public QpidRAMapMessage(final MapMessage message, final QpidRASessionImpl session)
+   {
+      super(message, session);
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + message + ", " + session + ")");
+      }
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getBoolean(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getBoolean(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getBoolean(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte getByte(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getByte(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getByte(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte[] getBytes(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getBytes(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getBytes(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public char getChar(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getChar(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getChar(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public double getDouble(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDouble(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getDouble(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public float getFloat(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getFloat(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getFloat(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getInt(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getInt(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getInt(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getLong(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getLong(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getLong(name);
+   }
+
+   /**
+    * Get the map names
+    * @return The values
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Enumeration<?> getMapNames() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMapNames()");
+      }
+
+      return ((MapMessage)_message).getMapNames();
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Object getObject(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getObject(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getObject(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public short getShort(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getShort(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getShort(name);
+   }
+
+   /**
+    * Get
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getString(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getString(" + name + ")");
+      }
+
+      return ((MapMessage)_message).getString(name);
+   }
+
+   /**
+    * Does the item exist
+    * @param name The name
+    * @return True / false
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean itemExists(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("itemExists(" + name + ")");
+      }
+
+      return ((MapMessage)_message).itemExists(name);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setBoolean(final String name, final boolean value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setBoolean(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setBoolean(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setByte(final String name, final byte value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setByte(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setByte(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @param offset The offset
+    * @param length The length
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setBytes(" + name + ", " + value + ", " + offset + ", " + length + ")");
+      }
+
+      ((MapMessage)_message).setBytes(name, value, offset, length);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setBytes(final String name, final byte[] value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setBytes(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setBytes(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setChar(final String name, final char value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setChar(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setChar(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDouble(final String name, final double value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDouble(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setDouble(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setFloat(final String name, final float value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setFloat(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setFloat(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setInt(final String name, final int value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setInt(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setInt(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setLong(final String name, final long value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setLong(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setLong(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setObject(final String name, final Object value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setObject(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setObject(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setShort(final String name, final short value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setShort(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setShort(name, value);
+   }
+
+   /**
+    * Set
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setString(final String name, final String value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setString(" + name + ", " + value + ")");
+      }
+
+      ((MapMessage)_message).setString(name, value);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessage.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,782 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.Enumeration;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message
+ */
+public class QpidRAMessage implements Message
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMessage.class);
+
+   /** The message */
+   protected Message _message;
+
+   /** The session */
+   protected QpidRASessionImpl _session;
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public QpidRAMessage(final Message message, final QpidRASessionImpl session)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + Util.asString(message) + ", " + session + ")");
+      }
+
+      this._message = message;
+      this._session = session;
+   }
+
+   /**
+    * Acknowledge
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void acknowledge() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("acknowledge()");
+      }
+
+      _session.getSession(); // Check for closed
+      _message.acknowledge();
+   }
+
+   /**
+    * Clear body
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void clearBody() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("clearBody()");
+      }
+
+      _message.clearBody();
+   }
+
+   /**
+    * Clear properties
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void clearProperties() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("clearProperties()");
+      }
+
+      _message.clearProperties();
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getBooleanProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getBooleanProperty(" + name + ")");
+      }
+
+      return _message.getBooleanProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte getByteProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getByteProperty(" + name + ")");
+      }
+
+      return _message.getByteProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public double getDoubleProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDoubleProperty(" + name + ")");
+      }
+
+      return _message.getDoubleProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public float getFloatProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getFloatProperty(" + name + ")");
+      }
+
+      return _message.getFloatProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getIntProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getIntProperty(" + name + ")");
+      }
+
+      return _message.getIntProperty(name);
+   }
+
+   /**
+    * Get correlation id
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getJMSCorrelationID() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSCorrelationID()");
+      }
+
+      return _message.getJMSCorrelationID();
+   }
+
+   /**
+    * Get correlation id
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSCorrelationIDAsBytes()");
+      }
+
+      return _message.getJMSCorrelationIDAsBytes();
+   }
+
+   /**
+    * Get delivery mode
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getJMSDeliveryMode() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSDeliveryMode()");
+      }
+
+      return _message.getJMSDeliveryMode();
+   }
+
+   /**
+    * Get destination
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Destination getJMSDestination() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSDestination()");
+      }
+
+      return _message.getJMSDestination();
+   }
+
+   /**
+    * Get expiration
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getJMSExpiration() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSExpiration()");
+      }
+
+      return _message.getJMSExpiration();
+   }
+
+   /**
+    * Get message id
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getJMSMessageID() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSMessageID()");
+      }
+
+      return _message.getJMSMessageID();
+   }
+
+   /**
+    * Get priority
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int getJMSPriority() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSPriority()");
+      }
+
+      return _message.getJMSPriority();
+   }
+
+   /**
+    * Get redelivered status
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getJMSRedelivered() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSRedelivered()");
+      }
+
+      return _message.getJMSRedelivered();
+   }
+
+   /**
+    * Get reply to destination
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Destination getJMSReplyTo() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSReplyTo()");
+      }
+
+      return _message.getJMSReplyTo();
+   }
+
+   /**
+    * Get timestamp
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getJMSTimestamp() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSTimestamp()");
+      }
+
+      return _message.getJMSTimestamp();
+   }
+
+   /**
+    * Get type
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getJMSType() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getJMSType()");
+      }
+
+      return _message.getJMSType();
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long getLongProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getLongProperty(" + name + ")");
+      }
+
+      return _message.getLongProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Object getObjectProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getObjectProperty(" + name + ")");
+      }
+
+      return _message.getObjectProperty(name);
+   }
+
+   /**
+    * Get property names
+    * @return The values
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Enumeration<?> getPropertyNames() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPropertyNames()");
+      }
+
+      return _message.getPropertyNames();
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public short getShortProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getShortProperty(" + name + ")");
+      }
+
+      return _message.getShortProperty(name);
+   }
+
+   /**
+    * Get property
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getStringProperty(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getStringProperty(" + name + ")");
+      }
+
+      return _message.getStringProperty(name);
+   }
+
+   /**
+    * Do property exist
+    * @param name The name
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean propertyExists(final String name) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("propertyExists(" + name + ")");
+      }
+
+      return _message.propertyExists(name);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setBooleanProperty(final String name, final boolean value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setBooleanProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setBooleanProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setByteProperty(final String name, final byte value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setByteProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setByteProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setDoubleProperty(final String name, final double value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDoubleProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setDoubleProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setFloatProperty(final String name, final float value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setFloatProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setFloatProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setIntProperty(final String name, final int value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setIntProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setIntProperty(name, value);
+   }
+
+   /**
+    * Set correlation id
+    * @param correlationID The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSCorrelationID(final String correlationID) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSCorrelationID(" + correlationID + ")");
+      }
+
+      _message.setJMSCorrelationID(correlationID);
+   }
+
+   /**
+    * Set correlation id
+    * @param correlationID The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSCorrelationIDAsBytes(" + correlationID + ")");
+      }
+
+      _message.setJMSCorrelationIDAsBytes(correlationID);
+   }
+
+   /**
+    * Set delivery mode
+    * @param deliveryMode The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSDeliveryMode(final int deliveryMode) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSDeliveryMode(" + deliveryMode + ")");
+      }
+
+      _message.setJMSDeliveryMode(deliveryMode);
+   }
+
+   /**
+    * Set destination
+    * @param destination The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSDestination(final Destination destination) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSDestination(" + destination + ")");
+      }
+
+      _message.setJMSDestination(destination);
+   }
+
+   /**
+    * Set expiration
+    * @param expiration The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSExpiration(final long expiration) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSExpiration(" + expiration + ")");
+      }
+
+      _message.setJMSExpiration(expiration);
+   }
+
+   /**
+    * Set message id
+    * @param id The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSMessageID(final String id) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSMessageID(" + id + ")");
+      }
+
+      _message.setJMSMessageID(id);
+   }
+
+   /**
+    * Set priority
+    * @param priority The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSPriority(final int priority) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSPriority(" + priority + ")");
+      }
+
+      _message.setJMSPriority(priority);
+   }
+
+   /**
+    * Set redelivered status
+    * @param redelivered The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSRedelivered(final boolean redelivered) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSRedelivered(" + redelivered + ")");
+      }
+
+      _message.setJMSRedelivered(redelivered);
+   }
+
+   /**
+    * Set reply to
+    * @param replyTo The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSReplyTo(final Destination replyTo) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSReplyTo(" + replyTo + ")");
+      }
+
+      _message.setJMSReplyTo(replyTo);
+   }
+
+   /**
+    * Set timestamp
+    * @param timestamp The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSTimestamp(final long timestamp) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSTimestamp(" + timestamp + ")");
+      }
+
+      _message.setJMSTimestamp(timestamp);
+   }
+
+   /**
+    * Set type
+    * @param type The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setJMSType(final String type) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setJMSType(" + type + ")");
+      }
+
+      _message.setJMSType(type);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setLongProperty(final String name, final long value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setLongProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setLongProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setObjectProperty(final String name, final Object value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setObjectProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setObjectProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setShortProperty(final String name, final short value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setShortProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setShortProperty(name, value);
+   }
+
+   /**
+    * Set property
+    * @param name The name
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setStringProperty(final String name, final String value) throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setStringProperty(" + name + ", " + value + ")");
+      }
+
+      _message.setStringProperty(name, value);
+   }
+
+   /**
+    * Return the hash code
+    * @return The hash code
+    */
+   @Override
+   public int hashCode()
+   {
+      return _message.hashCode();
+   }
+
+   /**
+    * Check for equality
+    * @param object The other object
+    * @return True / false
+    */
+   @Override
+   public boolean equals(final Object object)
+   {
+      if (object != null && object instanceof QpidRAMessage)
+      {
+         return _message.equals(((QpidRAMessage)object)._message);
+      }
+      else
+      {
+         return _message.equals(object);
+      }
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageConsumer.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,353 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message consumer
+ *
+ */
+public class QpidRAMessageConsumer implements MessageConsumer
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageConsumer.class);
+
+   /** The wrapped message consumer */
+   protected MessageConsumer _consumer;
+   /** The closed flag */
+   private AtomicBoolean _closed = new AtomicBoolean();
+
+   /** The session for this consumer */
+   protected QpidRASessionImpl _session;
+
+   /**
+    * Create a new wrapper
+    * @param consumer the consumer
+    * @param session the session
+    */
+   public QpidRAMessageConsumer(final MessageConsumer consumer, final QpidRASessionImpl session)
+   {
+      this._consumer = consumer;
+      this._session = session;
+
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("new QpidRAMessageConsumer " + this +
+                                            " consumer=" +
+                                            Util.asString(consumer) +
+                                            " session=" +
+                                            session);
+      }
+   }
+
+   /**
+    * Close
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void close() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("close " + this);
+      }
+      try
+      {
+         closeConsumer();
+      }
+      finally
+      {
+         _session.removeConsumer(this);
+      }
+   }
+
+   /**
+    * Check state
+    * @exception JMSException Thrown if an error occurs
+    */
+   void checkState() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("checkState()");
+      }
+      _session.checkState();
+   }
+
+   /**
+    * Get message listener
+    * @return The listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public MessageListener getMessageListener() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageListener()");
+      }
+
+      checkState();
+      _session.checkStrict();
+      return _consumer.getMessageListener();
+   }
+
+   /**
+    * Set message listener
+    * @param listener The listener
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setMessageListener(final MessageListener listener) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         checkState();
+         _session.checkStrict();
+         if (listener == null)
+         {
+            _consumer.setMessageListener(null);
+         }
+         else
+         {
+            _consumer.setMessageListener(wrapMessageListener(listener));
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Get message selector
+    * @return The selector
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getMessageSelector() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageSelector()");
+      }
+
+      checkState();
+      return _consumer.getMessageSelector();
+   }
+
+   /**
+    * Receive
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receive() throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("receive " + this);
+         }
+
+         checkState();
+         // Make an explicit start check otherwise qpid starts the dispatcher
+         Message message = (_session.isStarted() ? _consumer.receive() : null);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("received " + this + " result=" + Util.asString(message));
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Receive
+    * @param timeout The timeout value
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receive(final long timeout) throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("receive " + this + " timeout=" + timeout);
+         }
+
+         checkState();
+         // Make an explicit start check otherwise qpid starts the dispatcher
+         Message message = (_session.isStarted() ? _consumer.receive(timeout) : null);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("received " + this + " result=" + Util.asString(message));
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Receive
+    * @return The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Message receiveNoWait() throws JMSException
+   {
+      _session.lock();
+      try
+      {
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("receiveNoWait " + this);
+         }
+
+         checkState();
+         // Make an explicit start check otherwise qpid starts the dispatcher
+         Message message = (_session.isStarted() ? _consumer.receiveNoWait() : null);
+
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("received " + this + " result=" + Util.asString(message));
+         }
+
+         if (message == null)
+         {
+            return null;
+         }
+         else
+         {
+            return wrapMessage(message);
+         }
+      }
+      finally
+      {
+         _session.unlock();
+      }
+   }
+
+   /**
+    * Close consumer
+    * @exception JMSException Thrown if an error occurs
+    */
+   void closeConsumer() throws JMSException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("closeConsumer()");
+      }
+
+      if (!_closed.getAndSet(true))
+      {
+         _consumer.close();
+      }
+   }
+
+   /**
+    * Wrap message
+    * @param message The message to be wrapped
+    * @return The wrapped message
+    */
+   Message wrapMessage(final Message message)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("wrapMessage(" + Util.asString(message) + ")");
+      }
+
+      if (message instanceof BytesMessage)
+      {
+         return new QpidRABytesMessage((BytesMessage)message, _session);
+      }
+      else if (message instanceof MapMessage)
+      {
+         return new QpidRAMapMessage((MapMessage)message, _session);
+      }
+      else if (message instanceof ObjectMessage)
+      {
+         return new QpidRAObjectMessage((ObjectMessage)message, _session);
+      }
+      else if (message instanceof StreamMessage)
+      {
+         return new QpidRAStreamMessage((StreamMessage)message, _session);
+      }
+      else if (message instanceof TextMessage)
+      {
+         return new QpidRATextMessage((TextMessage)message, _session);
+      }
+      return new QpidRAMessage(message, _session);
+   }
+
+   /**
+    * Wrap message listener
+    * @param listener The listener to be wrapped
+    * @return The wrapped listener
+    */
+   MessageListener wrapMessageListener(final MessageListener listener)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageSelector()");
+      }
+
+      return new QpidRAMessageListener(listener, this);
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidRAMessageListener.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper for a message listener
+ */
+public class QpidRAMessageListener implements MessageListener
+{
+   /** The logger */
+   private static final Logger _log = LoggerFactory.getLogger(QpidRAMessageListener.class);
+
+   /** The message listener */
+   private final MessageListener _listener;
+
+   /** The consumer */
+   private final QpidRAMessageConsumer _consumer;
+
+   /**
+    * Create a new wrapper
+    * @param listener the listener
+    * @param consumer the consumer
+    */
+   public QpidRAMessageListener(final MessageListener listener, final QpidRAMessageConsumer consumer)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + listener + ", " + consumer + ")");
+      }
+
+      this._listener = listener;
+      this._consumer = consumer;
+   }
+
+   /**
+    * On message
+    * @param message The message
+    */
+   public void onMessage(Message message)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("onMessage(" + Util.asString(message) + ")");
+      }
+
+      message = _consumer.wrapMessage(message);
+      _listener.onMessage(message);
+   }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org