You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC

svn commit: r1220336 [7/8] - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ jca/ jca/example/ jca/example/conf/ jca/example/src/ jca/example/src/main/ jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,820 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.inflow.QpidActivation;
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The resource adapter for Qpid
+ *
+ */
+public class QpidResourceAdapter implements ResourceAdapter, Serializable
+{
+   /**
+    *
+    */
+   private static final long serialVersionUID = -2446231446818098726L;
+
+   /**
+    * The logger
+    */
+   private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+
+   /**
+    * The bootstrap context
+    */
+   private BootstrapContext _ctx;
+
+   /**
+    * The resource adapter properties
+    */
+   private final QpidRAProperties _raProperties;
+
+   /**
+    * Have the factory been configured
+    */
+   private final AtomicBoolean _configured;
+
+   /**
+    * The activations by activation spec
+    */
+   private final Map<ActivationSpec, QpidActivation> _activations;
+
+   private AMQConnectionFactory _defaultAMQConnectionFactory;
+
+   private TransactionManager _tm;
+
+   /**
+    * Constructor
+    */
+   public QpidResourceAdapter()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor()");
+      }
+
+      _raProperties = new QpidRAProperties();
+      _configured = new AtomicBoolean(false);
+      _activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>();
+   }
+
+   public TransactionManager getTM()
+   {
+      return _tm;
+   }
+
+   /**
+    * Endpoint activation
+    *
+    * @param endpointFactory The endpoint factory
+    * @param spec            The activation spec
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException
+   {
+      if (!_configured.getAndSet(true))
+      {
+         try
+         {
+            setup();
+         }
+         catch (QpidRAException e)
+         {
+            throw new ResourceException("Unable to create activation", e);
+         }
+      }
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
+      }
+
+      QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec);
+      _activations.put(spec, activation);
+      activation.start();
+   }
+
+   /**
+    * Endpoint deactivation
+    *
+    * @param endpointFactory The endpoint factory
+    * @param spec            The activation spec
+    */
+   public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
+      }
+
+      QpidActivation activation = _activations.remove(spec);
+      if (activation != null)
+      {
+         activation.stop();
+      }
+   }
+
+   /**
+    * Get XA resources
+    *
+    * @param specs The activation specs
+    * @return The XA resources
+    * @throws ResourceException Thrown if an error occurs or unsupported
+    */
+   public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getXAResources(" + specs + ")");
+      }
+
+      return null;
+   }
+
+   /**
+    * Start
+    *
+    * @param ctx The bootstrap context
+    * @throws ResourceAdapterInternalException
+    *          Thrown if an error occurs
+    */
+   public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start(" + ctx + ")");
+      }
+
+      locateTM();
+
+      this._ctx = ctx;
+
+      _log.info("Qpid resource adapter started");
+   }
+
+   /**
+    * Stop
+    */
+   public void stop()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("stop()");
+      }
+
+      for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet())
+      {
+         try
+         {
+            entry.getValue().stop();
+         }
+         catch (Exception ignored)
+         {
+            _log.debug("Ignored", ignored);
+         }
+      }
+
+      _activations.clear();
+
+      _log.info("Qpid resource adapter stopped");
+   }
+
+   /**
+    * Get the user name
+    *
+    * @return The value
+    */
+   public String getDefaultUserName()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUserName()");
+      }
+
+      return _raProperties.getUserName();
+   }
+
+   /**
+    * Set the user name
+    *
+    * @param userName The value
+    */
+   public void setDefaultUserName(final String userName)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUserName(" + userName + ")");
+      }
+
+      _raProperties.setUserName(userName);
+   }
+
+   /**
+    * Get the password
+    *
+    * @return The value
+    */
+   public String getDefaultPassword()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPassword()");
+      }
+
+      return _raProperties.getPassword();
+   }
+
+   /**
+    * Set the password
+    *
+    * @param password The value
+    */
+   public void setDefaultPassword(final String password)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPassword(****)");
+      }
+
+      _raProperties.setPassword(password);
+   }
+
+   /**
+    * Get the client ID
+    *
+    * @return The value
+    */
+   public String getClientId()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getClientID()");
+      }
+
+      return _raProperties.getClientId();
+   }
+
+   /**
+    * Set the client ID
+    *
+    * @param clientID The client id
+    */
+   public void setClientId(final String clientID)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setClientID(" + clientID + ")");
+      }
+
+      _raProperties.setClientId(clientID);
+   }
+
+   /**
+    * Get the host
+    *
+    * @return The value
+    */
+   public String getHost()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getHost()");
+      }
+
+      return _raProperties.getHost();
+   }
+
+   /**
+    * Set the host
+    *
+    * @param host The host
+    */
+   public void setHost(final String host)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setHost(" + host + ")");
+      }
+
+      _raProperties.setHost(host);
+   }
+
+   /**
+    * Get the port
+    *
+    * @return The value
+    */
+   public Integer getPort()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPort()");
+      }
+
+      return _raProperties.getPort();
+   }
+
+   /**
+    * Set the client ID
+    *
+    * @param port The port
+    */
+   public void setPort(final Integer port)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPort(" + port + ")");
+      }
+
+      _raProperties.setPort(port);
+   }
+
+   /**
+    * Get the connection url
+    *
+    * @return The value
+    */
+   public String getPath()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPath()");
+      }
+
+      return _raProperties.getPath();
+   }
+
+   /**
+    * Set the client ID
+    *
+    * @param path The path
+    */
+   public void setPath(final String path)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPath(" + path + ")");
+      }
+
+      _raProperties.setPath(path);
+   }
+
+   /**
+    * Get the connection url
+    *
+    * @return The value
+    */
+   public String getConnectionURL()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getConnectionURL()");
+      }
+
+      return _raProperties.getConnectionURL();
+   }
+
+   /**
+    * Set the client ID
+    *
+    * @param connectionURL The connection url
+    */
+   public void setConnectionURL(final String connectionURL)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setConnectionURL(" + connectionURL + ")");
+      }
+
+      _raProperties.setConnectionURL(connectionURL);
+   }
+
+   /**
+    * Get the transaction manager locator class
+    *
+    * @return The value
+    */
+   public String getTransactionManagerLocatorClass()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionManagerLocatorClass()");
+      }
+
+      return _raProperties.getTransactionManagerLocatorClass();
+   }
+
+   /**
+    * Set the transaction manager locator class
+    *
+    * @param locator The transaction manager locator class
+    */
+   public void setTransactionManagerLocatorClass(final String locator)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionManagerLocatorClass(" + locator + ")");
+      }
+
+      _raProperties.setTransactionManagerLocatorClass(locator);
+   }
+
+   /**
+    * Get the transaction manager locator method
+    *
+    * @return The value
+    */
+   public String getTransactionManagerLocatorMethod()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionManagerLocatorMethod()");
+      }
+
+      return _raProperties.getTransactionManagerLocatorMethod();
+   }
+
+   /**
+    * Set the transaction manager locator method
+    *
+    * @param method The transaction manager locator method
+    */
+   public void setTransactionManagerLocatorMethod(final String method)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionManagerLocatorMethod(" + method + ")");
+      }
+
+      _raProperties.setTransactionManagerLocatorMethod(method);
+   }
+
+   /**
+    * Get the use XA flag
+    *
+    * @return The value
+    */
+   public Boolean getUseLocalTx()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getUseLocalTx()");
+      }
+
+      return _raProperties.getUseLocalTx();
+   }
+
+   /**
+    * Set the use XA flag
+    *
+    * @param localTx The value
+    */
+   public void setUseLocalTx(final Boolean localTx)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setUseLocalTx(" + localTx + ")");
+      }
+
+      _raProperties.setUseLocalTx(localTx);
+   }
+
+   public Integer getSetupAttempts()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupAttempts()");
+      }
+      return _raProperties.getSetupAttempts();
+   }
+
+   public void setSetupAttempts(Integer setupAttempts)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupAttempts(" + setupAttempts + ")");
+      }
+      _raProperties.setSetupAttempts(setupAttempts);
+   }
+
+   public Long getSetupInterval()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupInterval()");
+      }
+      return _raProperties.getSetupInterval();
+   }
+
+   public void setSetupInterval(Long interval)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupInterval(" + interval + ")");
+      }
+      _raProperties.setSetupInterval(interval);
+   }
+
+   /**
+    * Indicates whether some other object is "equal to" this one.
+    *
+    * @param obj Object with which to compare
+    * @return True if this object is the same as the obj argument; false otherwise.
+    */
+   public boolean equals(final Object obj)
+   {
+      if (obj == null)
+      {
+         return false;
+      }
+
+      if (obj instanceof QpidResourceAdapter)
+      {
+         return _raProperties.equals(((QpidResourceAdapter)obj).getProperties());
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   /**
+    * Return the hash code for the object
+    *
+    * @return The hash code
+    */
+   public int hashCode()
+   {
+      return _raProperties.hashCode();
+   }
+
+   /**
+    * Get the work manager
+    *
+    * @return The manager
+    */
+   public WorkManager getWorkManager()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getWorkManager()");
+      }
+
+      if (_ctx == null)
+      {
+         return null;
+      }
+
+      return _ctx.getWorkManager();
+   }
+
+   public XASession createXASession(final XAConnectionImpl connection)
+      throws Exception
+   {
+      final XASession result = connection.createXASession() ;
+      if (_log.isDebugEnabled())
+      {
+         _log.debug("Using session " + Util.asString(result));
+      }
+      return result ;
+   }
+
+   public Session createSession(final AMQConnection connection,
+                                      final int ackMode,
+                                      final boolean useLocalTx,
+                                      final Integer prefetchLow,
+                                      final Integer prefetchHigh) throws Exception
+   {
+      Session result;
+
+      if (prefetchLow == null)
+      {
+         result = connection.createSession(useLocalTx, ackMode) ;
+      }
+      else if (prefetchHigh == null)
+      {
+         result = connection.createSession(useLocalTx, ackMode, prefetchLow) ;
+      }
+      else
+      {
+         result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ;
+      }
+
+      if (_log.isDebugEnabled())
+      {
+         _log.debug("Using session " + Util.asString(result));
+      }
+
+      return result;
+
+   }
+
+   /**
+    * Get the resource adapter properties
+    *
+    * @return The properties
+    */
+   protected QpidRAProperties getProperties()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getProperties()");
+      }
+
+      return _raProperties;
+   }
+
+   /**
+    * Setup the factory
+    */
+   protected void setup() throws QpidRAException
+   {
+      _defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties);
+   }
+
+
+   public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
+   {
+      if (!_configured.getAndSet(true))
+      {
+         try
+         {
+            setup();
+         }
+         catch (QpidRAException e)
+         {
+            throw new ResourceException("Unable to create activation", e);
+         }
+      }
+      return _defaultAMQConnectionFactory;
+   }
+
+   public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+      throws QpidRAException
+   {
+      try
+      {
+         return createFactory(overrideProperties);
+      }
+      catch (final URLSyntaxException urlse)
+      {
+         throw new QpidRAException("Unexpected exception creating connection factory", urlse) ;
+      }
+   }
+
+   public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
+                                                           final Map<String, Object> overrideConnectionParams)
+   {
+      Map<String, Object> map = new HashMap<String, Object>();
+
+      if(connectionParams != null)
+      {
+         map.putAll(connectionParams);
+      }
+      if(overrideConnectionParams != null)
+      {
+         for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet())
+         {
+            map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
+         }
+      }
+      return map;
+   }
+
+   private void locateTM()
+   {
+      if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+      {
+
+          String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
+          String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";");
+
+          for (int i = 0 ; i < locatorClasses.length; i++)
+          {
+              _tm = Util.locateTM(locatorClasses[i], locatorMethods[i]);
+              if (_tm != null)
+              {
+                  break;
+              }
+          }
+
+
+      }
+
+      if (_tm == null)
+      {
+         _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+         _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases.");
+      }
+      else
+      {
+         if (_log.isDebugEnabled())
+         {
+            _log.debug("TM located = " + _tm);
+         }
+      }
+   }
+
+
+   private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties)
+      throws URLSyntaxException, QpidRAException
+   {
+      final String overrideURL = overrideProperties.getConnectionURL() ;
+      final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ;
+
+      final String overrideClientID = overrideProperties.getClientId() ;
+      final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ;
+
+      final String overrideDefaultPassword = overrideProperties.getPassword() ;
+      final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ;
+
+      final String overrideDefaultUsername = overrideProperties.getUserName() ;
+      final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ;
+
+      final String overrideHost = overrideProperties.getHost() ;
+      final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ;
+
+      final Integer overridePort = overrideProperties.getPort() ;
+      final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ;
+
+      final String overridePath = overrideProperties.getPath() ;
+      final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ;
+
+      final AMQConnectionFactory cf ;
+
+      if (url != null)
+      {
+         cf = new AMQConnectionFactory(url) ;
+
+         if (clientID != null)
+         {
+            cf.getConnectionURL().setClientName(clientID) ;
+         }
+      }
+      else
+      {
+         // create a URL to force the connection details
+         if ((host == null) || (port == null) || (path == null))
+         {
+            throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ;
+         }
+         final String username = (defaultUsername != null ? defaultUsername : "") ;
+         final String password = (defaultPassword != null ? defaultPassword : "") ;
+         final String client = (clientID != null ? clientID : "") ;
+
+         final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+         if (_log.isDebugEnabled())
+         {
+            _log.debug("Initialising connectionURL to " + newurl) ;
+         }
+
+         cf = new AMQConnectionFactory(newurl) ;
+      }
+
+      return cf ;
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+
+import javax.naming.Context;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.transaction.TransactionManager;
+
+import org.apache.qpid.ra.admin.QpidQueue;
+import org.apache.qpid.ra.admin.QpidTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Various utility functions
+ *
+ */
+public class Util
+{
+
+   private static final Logger _log = LoggerFactory.getLogger(Util.class);
+
+   /**
+    * Compare two strings.
+    * @param me First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final String me, final String you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null && you != null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Lookup an object in the default initial context
+    * @param context The context to use
+    * @param name the name to lookup
+    * @param clazz the expected type
+    * @return the object
+    * @throws Exception for any error
+    */
+   public static <T> T lookup(final Context context, final String name, final Class<T> clazz) throws Exception
+   {
+	   Object object = context.lookup(name);
+
+	   if (object instanceof Reference)
+       {
+
+           Reference ref = (Reference) object;
+           String addressContent = null;
+
+           if (ref.getClassName().equals(QpidQueue.class.getName()))
+           {
+               RefAddr addr = ref.get(QpidQueue.class.getName());
+               addressContent = (String) addr.getContent();
+
+               if (addr != null)
+               {
+                   return (T)new QpidQueue(addressContent);
+               }
+           }
+
+           if (ref.getClassName().equals(QpidTopic.class.getName()))
+           {
+               RefAddr addr = ref.get(QpidTopic.class.getName());
+               addressContent = (String) addr.getContent();
+
+               if (addr != null)
+               {
+                   return (T)new QpidTopic(addressContent);
+               }
+           }
+       }
+
+	   return clazz.cast(object);
+
+   }
+
+   /** The Resource adapter can't depend on any provider's specific library. Because of that we use reflection to locate the
+    *  transaction manager during startup.
+    *
+    *
+    *  TODO: We should use a proper SPI instead of reflection
+    *        We would need to define a proper SPI package for this.
+    **/
+   public static TransactionManager locateTM(final String locatorClass, final String locatorMethod)
+   {
+      try
+      {
+         ClassLoader loader = Thread.currentThread().getContextClassLoader();
+         Class<?> aClass = loader.loadClass(locatorClass);
+         Object o = aClass.newInstance();
+         Method m = aClass.getMethod(locatorMethod);
+         return (TransactionManager)m.invoke(o);
+      }
+      catch (Throwable e)
+      {
+         _log.debug(e.getMessage(), e);
+         return null;
+      }
+   }
+
+   /**
+    * Serialize the object into a byte array.
+    * @param serializable The serializable object
+    * @return The generated byte array
+    * @throws IOException For errors during serialization.
+    */
+   public static byte[] serialize(final Serializable serializable)
+      throws IOException
+   {
+      final ByteArrayOutputStream baos = new ByteArrayOutputStream() ;
+      final ObjectOutputStream oos = new ObjectOutputStream(baos) ;
+      oos.writeObject(serializable) ;
+      oos.close() ;
+      return baos.toByteArray() ;
+   }
+
+   /**
+    * Deserialize the byte array into an object.
+    * @param data The serialized object as a byte array
+    * @return The serializable object.
+    * @throws IOException For errors during deserialization
+    * @throws ClassNotFoundException If the deserialized class cannot be found.
+    */
+   public static Object deserialize(final byte[] data)
+      throws IOException, ClassNotFoundException
+   {
+      final ByteArrayInputStream bais = new ByteArrayInputStream(data) ;
+      final ObjectInputStream ois = new ObjectInputStream(bais) ;
+      return ois.readObject() ;
+   }
+
+   /**
+    * Return a string identification for the specified object.
+    * @param object The object value.
+    * @return The string identification.
+    */
+   public static String asString(final Object object)
+   {
+      return (object == null ? "null" : object.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(object))) ;
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import java.util.Hashtable;
+
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.spi.ObjectFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdminObjectFactory implements ObjectFactory
+{
+    private static final Logger _log = LoggerFactory.getLogger(AdminObjectFactory.class);
+
+    @Override
+    public Object getObjectInstance(Object object, Name name, Context context, Hashtable<?, ?> env) throws Exception
+    {
+
+        Object instance = null;
+
+        if (object instanceof Reference)
+        {
+            Reference ref = (Reference) object;
+            String bindingURLString;
+
+            if (ref.getClassName().equals(QpidQueue.class.getName()))
+            {
+                RefAddr addr = ref.get(QpidQueue.class.getName());
+                bindingURLString = (String) addr.getContent();
+
+                if (addr != null)
+                {
+                    return new QpidQueue(bindingURLString);
+                }
+
+            }
+
+            if (ref.getClassName().equals(QpidTopic.class.getName()))
+            {
+                RefAddr addr = ref.get(QpidTopic.class.getName());
+                bindingURLString = (String) addr.getContent();
+
+                if (addr != null)
+                {
+                    return new QpidTopic(bindingURLString);
+                }
+            }
+        }
+        return instance;
+    }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import java.net.URISyntaxException;
+
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class QpidBindingURL  extends AMQBindingURL {
+
+	private String _url;
+
+	public QpidBindingURL(String url) throws URISyntaxException {
+		super(url);
+
+		if (!url.contains(BindingURL.OPTION_ROUTING_KEY) || getRoutingKey() == null) {
+			setOption(BindingURL.OPTION_ROUTING_KEY, null);
+		}
+
+		this._url = url;
+	}
+
+	@Override
+	public String getURL() {
+		return _url;
+	}
+
+	@Override
+	public String toString() {
+		return _url;
+	}
+
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra.admin;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.spi.ObjectFactory;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class QpidConnectionFactoryProxy implements Externalizable, Referenceable, ConnectionFactory, Serializable
+{
+    private static final Logger _log = LoggerFactory.getLogger(QpidDestinationProxy.class);
+
+    private String _connectionURL;
+
+    private ConnectionFactory _delegate;
+
+    /**
+     * This constructor should not only be used be de-serialisation code. Create
+     * original object with the other constructor.
+     */
+    public QpidConnectionFactoryProxy()
+    {
+    }
+
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+    {
+        Reference ref = (Reference) in.readObject();
+
+        try
+        {
+            _delegate = (ConnectionFactory) dereference(ref);
+
+        } catch (Exception e)
+        {
+            _log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e);
+            throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage());
+        }
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException
+    {
+        if (_delegate == null)
+        {
+            _log.error("Null Destination ");
+            throw new IOException("Null ConnectionFactory!");
+        }
+
+        try
+        {
+            out.writeObject(((Referenceable) _delegate).getReference());
+        }
+        catch (NamingException e)
+        {
+            _log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e);
+            throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public Reference getReference() throws NamingException
+    {
+        try
+        {
+            _delegate = new AMQConnectionFactory(getConnectionURL());
+            /*
+            QpidResourceAdapter ra = new QpidResourceAdapter();
+            QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory();
+            mcf.setResourceAdapter(ra);
+            mcf.setConnectionURL(getConnectionURL());
+            delegate = new QpidRAConnectionFactoryImpl(mcf, null);
+            */
+            return ((Referenceable) _delegate).getReference();
+        }
+        catch(Exception e)
+        {
+            throw new NamingException(e.getMessage());
+        }
+    }
+    private Object dereference(Reference ref) throws Exception
+    {
+        ObjectFactory objFactory = (ObjectFactory) Class.forName(
+                ref.getFactoryClassName()).newInstance();
+        return objFactory.getObjectInstance(ref, null, null, null);
+    }
+
+    public void setConnectionURL(final String connectionURL)
+    {
+        this._connectionURL = connectionURL;
+    }
+    public String getConnectionURL()
+    {
+        return this._connectionURL;
+    }
+
+  /**
+    * Create a connection
+    * @return The connection
+    * @exception JMSException Thrown if the operation fails
+    */
+   public Connection createConnection() throws JMSException
+   {
+       return _delegate.createConnection();
+   }
+
+   /**
+    * Create a connection
+    * @param userName The user name
+    * @param password The password
+    * @return The connection
+    * @exception JMSException Thrown if the operation fails
+    */
+   public Connection createConnection(final String userName, final String password) throws JMSException
+   {
+      return _delegate.createConnection(userName, password);
+   }
+
+}
+

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra.admin;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+import javax.jms.Destination;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.spi.ObjectFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The QpidDestinationProxy provides for allowing an administrator/developer to
+ * create and bind QPID destinations into a JNDI tree. AdminObjects are used as
+ * an generic integration point rather than relying on the EE server specific
+ * API's to create destinations (queues, topics). AdminObjects and associated
+ * properties are defined in the ra.xml file for a particular JCA adapter.
+ * Please see the ra.xml file for the QPID JCA resource adapter as well as the
+ * README.txt for the adapter for more details.
+ *
+ */
+public class QpidDestinationProxy implements Externalizable, Referenceable, Destination, Serializable
+{
+    private static final long serialVersionUID = -1137413782643796461L;
+
+    private static final Logger _log = LoggerFactory.getLogger(QpidDestinationProxy.class);
+
+    private static final String DEFAULT_QUEUE_TYPE = "QUEUE";
+
+    private static final String DEFAULT_TOPIC_TYPE = "TOPIC";
+
+    private String _destinationAddress;
+
+    private String _destinationType;
+
+    private Destination _delegate;
+
+    /**
+     * This constructor should not only be used be de-serialisation code. Create
+     * original object with the other constructor.
+     */
+    public QpidDestinationProxy()
+    {
+    }
+
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+    {
+        Reference ref = (Reference) in.readObject();
+
+        try
+        {
+            _delegate = (Destination) dereference(ref);
+
+        } catch (Exception e)
+        {
+            _log.error("Failed to dereference Destination " + e.getMessage(), e);
+            throw new IOException("Failed to dereference Destination: " + e.getMessage());
+        }
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException
+    {
+        if (_delegate == null)
+        {
+            _log.error("Null Destination ");
+            throw new IOException("Null destination!");
+        }
+
+        try
+        {
+            out.writeObject(((Referenceable) _delegate).getReference());
+        }
+        catch (NamingException e)
+        {
+            _log.error("Failed to dereference Destination " + e.getMessage(), e);
+            throw new IOException("Failed to dereference Destination: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public Reference getReference() throws NamingException
+    {
+        try
+        {
+            if(getDestinationType().equalsIgnoreCase(DEFAULT_QUEUE_TYPE))
+            {
+                _delegate = new QpidQueue(getDestinationAddress());
+            }
+            else if(getDestinationType().equalsIgnoreCase(DEFAULT_TOPIC_TYPE))
+            {
+                _delegate = new QpidTopic(getDestinationAddress());
+            }
+            else
+            {
+                throw new IllegalStateException("Unknown destination type " + getDestinationType());
+            }
+
+            return ((Referenceable) _delegate).getReference();
+
+        }
+        catch(Exception e)
+        {
+            _log.error(e.getMessage(),e);
+            throw new NamingException("Failed to create destination " + e.getMessage());
+        }
+
+    }
+
+    private Object dereference(Reference ref) throws Exception
+    {
+        ObjectFactory objFactory = (ObjectFactory) Class.forName(
+                ref.getFactoryClassName()).newInstance();
+        return objFactory.getObjectInstance(ref, null, null, null);
+    }
+
+    public void setDestinationAddress(String destinationAddress) throws Exception
+    {
+        this._destinationAddress = destinationAddress;
+    }
+
+    public String getDestinationAddress()
+    {
+        return this._destinationAddress;
+    }
+
+    public void setDestinationType(String destinationType)
+    {
+        this._destinationType = destinationType;
+    }
+
+    public String getDestinationType()
+    {
+        return this._destinationType;
+    }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpid.client.AMQQueue;
+
+public class QpidQueue extends AMQQueue
+{
+    private String _url;
+
+    public QpidQueue(final String address) throws Exception
+    {
+        super(address);
+        this._url = address;
+    }
+
+    @Override
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+                AdminObjectFactory.class.getName(), null);
+    }
+
+    @Override
+    public String toURL()
+    {
+        return _url;
+    }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpid.client.AMQTopic;
+
+public class QpidTopic extends AMQTopic
+{
+    private String _url;
+
+    public QpidTopic(final String address) throws Exception
+    {
+        super(address);
+        this._url = address;
+    }
+
+    @Override
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+                AdminObjectFactory.class.getName(), null);
+    }
+
+    @Override
+    public String toURL()
+    {
+        return _url;
+    }
+
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,593 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+
+/**
+ * The activation.
+ *
+ */
+public class QpidActivation implements ExceptionListener
+{
+   /**
+    * The logger
+    */
+   private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
+
+   /**
+    * The onMessage method
+    */
+   public static final Method ONMESSAGE;
+
+   /**
+    * The resource adapter
+    */
+   private final QpidResourceAdapter _ra;
+
+   /**
+    * The activation spec
+    */
+   private final QpidActivationSpec _spec;
+
+   /**
+    * The message endpoint factory
+    */
+   private final MessageEndpointFactory _endpointFactory;
+
+   /**
+    * Whether delivery is active
+    */
+   private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+   /**
+    * The destination type
+    */
+   private boolean _isTopic = false;
+
+   /**
+    * Is the delivery transacted
+    */
+   private boolean _isDeliveryTransacted;
+
+   private Destination _destination;
+
+   /**
+    * The connection
+    */
+   private Connection _connection;
+
+   private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
+
+   private AMQConnectionFactory _factory;
+
+   // Whether we are in the failure recovery loop
+   private AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+   static
+   {
+      try
+      {
+         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
+
+   /**
+    * Constructor
+    *
+    * @param ra              The resource adapter
+    * @param endpointFactory The endpoint factory
+    * @param spec            The activation spec
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public QpidActivation(final QpidResourceAdapter ra,
+                            final MessageEndpointFactory endpointFactory,
+                            final QpidActivationSpec spec) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+      }
+
+      this._ra = ra;
+      this._endpointFactory = endpointFactory;
+      this._spec = spec;
+      try
+      {
+         _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+      }
+      catch (Exception e)
+      {
+         throw new ResourceException(e);
+      }
+   }
+
+   /**
+    * Get the activation spec
+    *
+    * @return The value
+    */
+   public QpidActivationSpec getActivationSpec()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getActivationSpec()");
+      }
+
+      return _spec;
+   }
+
+   /**
+    * Get the message endpoint factory
+    *
+    * @return The value
+    */
+   public MessageEndpointFactory getMessageEndpointFactory()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageEndpointFactory()");
+      }
+
+      return _endpointFactory;
+   }
+
+   /**
+    * Get whether delivery is transacted
+    *
+    * @return The value
+    */
+   public boolean isDeliveryTransacted()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("isDeliveryTransacted()");
+      }
+
+      return _isDeliveryTransacted;
+   }
+
+   /**
+    * Get the work manager
+    *
+    * @return The value
+    */
+   public WorkManager getWorkManager()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getWorkManager()");
+      }
+
+      return _ra.getWorkManager();
+   }
+
+   /**
+    * Is the destination a topic
+    *
+    * @return The value
+    */
+   public boolean isTopic()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("isTopic()");
+      }
+
+      return _isTopic;
+   }
+
+   /**
+    * Start the activation
+    *
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public void start() throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("start()");
+      }
+      _deliveryActive.set(true);
+      _ra.getWorkManager().scheduleWork(new SetupActivation());
+   }
+
+   /**
+    * Stop the activation
+    */
+   public void stop()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("stop()");
+      }
+
+      _deliveryActive.set(false);
+      teardown();
+   }
+
+   /**
+    * Setup the activation
+    *
+    * @throws Exception Thrown if an error occurs
+    */
+   protected synchronized void setup() throws Exception
+   {
+      _log.debug("Setting up " + _spec);
+      setupCF();
+
+      setupDestination();
+      final AMQConnection amqConnection ;
+      final boolean useLocalTx = _spec.isUseLocalTx() ;
+      final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
+
+      if (isXA)
+      {
+         amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+      }
+      else
+      {
+         amqConnection = (AMQConnection)_factory.createConnection() ;
+      }
+
+      amqConnection.setExceptionListener(this) ;
+
+      for (int i = 0; i < _spec.getMaxSession(); i++)
+      {
+         Session session = null;
+
+         try
+         {
+            if (isXA)
+            {
+               session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
+            }
+            else
+            {
+               session = _ra.createSession((AMQConnection)amqConnection,
+                     _spec.getAcknowledgeModeInt(),
+                     useLocalTx,
+                     _spec.getPrefetchLow(),
+                     _spec.getPrefetchHigh());
+            }
+
+            _log.debug("Using session " + Util.asString(session));
+            QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
+            handler.setup();
+            _handlers.add(handler);
+         }
+         catch (Exception e)
+         {
+            try
+            {
+               amqConnection.close() ;
+            }
+            catch (Exception e2)
+            {
+               _log.trace("Ignored error closing connection", e2);
+            }
+
+            throw e;
+         }
+      }
+      amqConnection.start() ;
+      this._connection = amqConnection ;
+
+      _log.debug("Setup complete " + this);
+   }
+
+   /**
+    * Teardown the activation
+    */
+   protected synchronized void teardown()
+   {
+      _log.debug("Tearing down " + _spec);
+
+      try
+      {
+         if (_connection != null)
+         {
+            _connection.stop();
+         }
+      }
+      catch (Throwable t)
+      {
+         _log.debug("Error stopping connection " + Util.asString(_connection), t);
+      }
+
+      for (QpidMessageHandler handler : _handlers)
+      {
+         handler.teardown();
+      }
+
+      try
+      {
+         if (_connection != null)
+         {
+            _connection.close();
+         }
+      }
+      catch (Throwable t)
+      {
+         _log.debug("Error closing connection " + Util.asString(_connection), t);
+      }
+      if (_spec.isHasBeenUpdated())
+      {
+         _factory = null;
+      }
+      _log.debug("Tearing down complete " + this);
+   }
+
+   protected void setupCF() throws Exception
+   {
+      if (_spec.isHasBeenUpdated())
+      {
+         _factory = _ra.createAMQConnectionFactory(_spec);
+      }
+      else
+      {
+         _factory = _ra.getDefaultAMQConnectionFactory();
+      }
+   }
+
+   public Destination getDestination()
+   {
+      return _destination;
+   }
+
+   protected void setupDestination() throws Exception
+   {
+
+      String destinationName = _spec.getDestination();
+      String destinationTypeString = _spec.getDestinationType();
+
+      if (_spec.isUseJNDI())
+      {
+         Context ctx = new InitialContext();
+         _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+         if (_log.isTraceEnabled())
+         {
+            _log.trace("setupDestination(" + ctx + ")");
+         }
+
+         if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+         {
+            _log.debug("Destination type defined as " + destinationTypeString);
+
+            Class<? extends Destination> destinationType;
+            if (Topic.class.getName().equals(destinationTypeString))
+            {
+               destinationType = Topic.class;
+               _isTopic = true;
+            }
+            else
+            {
+               destinationType = Queue.class;
+            }
+
+            _log.debug("Retrieving destination " + destinationName +
+                                        " of type " +
+                                        destinationType.getName());
+            _destination = Util.lookup(ctx, destinationName, destinationType);
+            //_destination = (Destination)ctx.lookup(destinationName);
+
+         }
+         else
+         {
+            _log.debug("Destination type not defined");
+            _log.debug("Retrieving destination " + destinationName +
+                                        " of type " +
+                                        Destination.class.getName());
+
+            _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+            _isTopic = !(_destination instanceof Queue) ;
+         }
+      }
+      else
+      {
+         _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+         if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+         {
+            _log.debug("Destination type defined as " + destinationTypeString);
+            final boolean match ;
+            if (Topic.class.getName().equals(destinationTypeString))
+            {
+               match = (_destination instanceof Topic) ;
+               _isTopic = true;
+            }
+            else
+            {
+               match = (_destination instanceof Queue) ;
+            }
+            if (!match)
+            {
+               throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+            }
+         }
+         else
+         {
+            _isTopic = !(_destination instanceof Queue) ;
+         }
+      }
+
+      _log.debug("Got destination " + _destination + " from " + destinationName);
+   }
+
+   /**
+    * Get a string representation
+    *
+    * @return The value
+    */
+   @Override
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(QpidActivation.class.getName()).append('(');
+      buffer.append("spec=").append(_spec.getClass().getName());
+      buffer.append(" mepf=").append(_endpointFactory.getClass().getName());
+      buffer.append(" active=").append(_deliveryActive.get());
+      if (_spec.getDestination() != null)
+      {
+         buffer.append(" destination=").append(_spec.getDestination());
+      }
+      buffer.append(" transacted=").append(_isDeliveryTransacted);
+      buffer.append(')');
+      return buffer.toString();
+   }
+
+   public void onException(final JMSException jmse)
+   {
+      handleFailure(jmse) ;
+   }
+
+   /**
+    * Handles any failure by trying to reconnect
+    *
+    * @param failure the reason for the failure
+    */
+   public void handleFailure(Throwable failure)
+   {
+      if(doesNotExist(failure))
+      {
+         _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+      }
+      else
+      {
+         _log.warn("Failure in Qpid activation " + _spec, failure);
+      }
+      int reconnectCount = 0;
+      int setupAttempts = _spec.getSetupAttempts();
+      long setupInterval = _spec.getSetupInterval();
+
+      // Only enter the failure loop once
+      if (_inFailure.getAndSet(true))
+         return;
+      try
+      {
+         while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+         {
+            teardown();
+
+            try
+            {
+               Thread.sleep(setupInterval);
+            }
+            catch (InterruptedException e)
+            {
+               _log.debug("Interrupted trying to reconnect " + _spec, e);
+               break;
+            }
+
+            _log.info("Attempting to reconnect " + _spec);
+            try
+            {
+               setup();
+               _log.info("Reconnected with Qpid");
+               break;
+            }
+            catch (Throwable t)
+            {
+               if(doesNotExist(failure))
+               {
+                  _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+               }
+               else
+               {
+                  _log.error("Unable to reconnect " + _spec, t);
+               }
+            }
+            ++reconnectCount;
+         }
+      }
+      finally
+      {
+         // Leaving failure recovery loop
+         _inFailure.set(false);
+      }
+   }
+
+   /**
+    * Check to see if the failure represents a missing endpoint
+    * @param failure The failure.
+    * @return true if it represents a missing endpoint, false otherwise
+    */
+   private boolean doesNotExist(final Throwable failure)
+   {
+      return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+   }
+
+   /**
+    * Handles the setup
+    */
+   private class SetupActivation implements Work
+   {
+      public void run()
+      {
+         try
+         {
+            setup();
+         }
+         catch (Throwable t)
+         {
+            handleFailure(t);
+         }
+      }
+
+      public void release()
+      {
+      }
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,604 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.io.Serializable;
+
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.InvalidPropertyException;
+import javax.resource.spi.ResourceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.ra.ConnectionFactoryProperties;
+import org.apache.qpid.ra.QpidResourceAdapter;
+
+/**
+ * The activation spec
+ * These properties are set on the MDB ActivactionProperties
+ *
+ */
+public class QpidActivationSpec extends ConnectionFactoryProperties implements ActivationSpec, Serializable
+{
+   private static final long serialVersionUID = 7379131936083146158L;
+
+   private static final int DEFAULT_MAX_SESSION = 15;
+
+   /** The logger */
+   private static final transient Logger _log = LoggerFactory.getLogger(QpidActivationSpec.class);
+
+   /** The resource adapter */
+   private QpidResourceAdapter _ra;
+
+   /** The destination */
+   private String _destination;
+
+   /** The destination type */
+   private String _destinationType;
+
+   /** The message selector */
+   private String _messageSelector;
+
+   /** The acknowledgement mode */
+   private int _acknowledgeMode;
+
+   /** The subscription durability */
+   private boolean _subscriptionDurability;
+
+   /** The subscription name */
+   private String _subscriptionName;
+
+   /** The maximum number of sessions */
+   private Integer _maxSession;
+
+   /** Transaction timeout */
+   private Integer _transactionTimeout;
+
+   /** prefetch low */
+   private Integer _prefetchLow;
+
+   /** prefetch high */
+   private Integer _prefetchHigh;
+
+   private boolean _useJNDI = true;
+
+   // undefined by default, default is specified at the RA level in QpidRAProperties
+   private Integer _setupAttempts;
+
+   // undefined by default, default is specified at the RA level in QpidRAProperties
+   private Long _setupInterval;
+
+   /**
+    * Constructor
+    */
+   public QpidActivationSpec()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("constructor()");
+      }
+
+      _acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+      _maxSession = DEFAULT_MAX_SESSION;
+      _transactionTimeout = 0;
+   }
+
+   /**
+    * Get the resource adapter
+    * @return The resource adapter
+    */
+   public ResourceAdapter getResourceAdapter()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getResourceAdapter()");
+      }
+
+      return _ra;
+   }
+
+   /**
+    * @return the useJNDI
+    */
+   public boolean isUseJNDI()
+   {
+      return _useJNDI;
+   }
+
+   /**
+    * @param value the useJNDI to set
+    */
+   public void setUseJNDI(final boolean value)
+   {
+      _useJNDI = value;
+   }
+
+   /**
+    * Set the resource adapter
+    * @param ra The resource adapter
+    * @exception ResourceException Thrown if incorrect resource adapter
+    */
+   public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setResourceAdapter(" + ra + ")");
+      }
+
+      if (ra == null || !(ra instanceof QpidResourceAdapter))
+      {
+         throw new ResourceException("Resource adapter is " + ra);
+      }
+
+      this._ra = (QpidResourceAdapter)ra;
+   }
+
+   /**
+    * Get the destination
+    * @return The value
+    */
+   public String getDestination()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDestination()");
+      }
+
+      return _destination;
+   }
+
+   /**
+    * Set the destination
+    * @param value The value
+    */
+   public void setDestination(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDestination(" + value + ")");
+      }
+
+      _destination = value;
+   }
+
+   /**
+    * Get the destination type
+    * @return The value
+    */
+   public String getDestinationType()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getDestinationType()");
+      }
+
+      return _destinationType;
+   }
+
+   /**
+    * Set the destination type
+    * @param value The value
+    */
+   public void setDestinationType(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setDestinationType(" + value + ")");
+      }
+
+      _destinationType = value;
+   }
+
+   /**
+    * Get the message selector
+    * @return The value
+    */
+   public String getMessageSelector()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMessageSelector()");
+      }
+
+      return _messageSelector;
+   }
+
+   /**
+    * Set the message selector
+    * @param value The value
+    */
+   public void setMessageSelector(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setMessageSelector(" + value + ")");
+      }
+
+      _messageSelector = value;
+   }
+
+   /**
+    * Get the acknowledge mode
+    * @return The value
+    */
+   public String getAcknowledgeMode()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getAcknowledgeMode()");
+      }
+
+      if (Session.DUPS_OK_ACKNOWLEDGE == _acknowledgeMode)
+      {
+         return "Dups-ok-acknowledge";
+      }
+      else
+      {
+         return "Auto-acknowledge";
+      }
+   }
+
+   /**
+    * Set the acknowledge mode
+    * @param value The value
+    */
+   public void setAcknowledgeMode(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setAcknowledgeMode(" + value + ")");
+      }
+
+      if ("DUPS_OK_ACKNOWLEDGE".equalsIgnoreCase(value) || "Dups-ok-acknowledge".equalsIgnoreCase(value))
+      {
+         _acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
+      }
+      else if ("AUTO_ACKNOWLEDGE".equalsIgnoreCase(value) || "Auto-acknowledge".equalsIgnoreCase(value))
+      {
+         _acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+      }
+      else
+      {
+         throw new IllegalArgumentException("Unsupported acknowledgement mode " + value);
+      }
+   }
+
+   /**
+    * @return the acknowledgement mode
+    */
+   public int getAcknowledgeModeInt()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getAcknowledgeMode()");
+      }
+
+      return _acknowledgeMode;
+   }
+
+   /**
+    * Get the subscription durability
+    * @return The value
+    */
+   public String getSubscriptionDurability()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSubscriptionDurability()");
+      }
+
+      if (_subscriptionDurability)
+      {
+         return "Durable";
+      }
+      else
+      {
+         return "NonDurable";
+      }
+   }
+
+   /**
+    * Set the subscription durability
+    * @param value The value
+    */
+   public void setSubscriptionDurability(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSubscriptionDurability(" + value + ")");
+      }
+
+      _subscriptionDurability = "Durable".equals(value);
+   }
+
+   /**
+    * Get the status of subscription durability
+    * @return The value
+    */
+   public boolean isSubscriptionDurable()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("isSubscriptionDurable()");
+      }
+
+      return _subscriptionDurability;
+   }
+
+   /**
+    * Get the subscription name
+    * @return The value
+    */
+   public String getSubscriptionName()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSubscriptionName()");
+      }
+
+      return _subscriptionName;
+   }
+
+   /**
+    * Set the subscription name
+    * @param value The value
+    */
+   public void setSubscriptionName(final String value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSubscriptionName(" + value + ")");
+      }
+
+      _subscriptionName = value;
+   }
+
+   /**
+    * Get the number of max session
+    * @return The value
+    */
+   public Integer getMaxSession()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getMaxSession()");
+      }
+
+      if (_maxSession == null)
+      {
+         return DEFAULT_MAX_SESSION;
+      }
+
+      return _maxSession;
+   }
+
+   /**
+    * Set the number of max session
+    * @param value The value
+    */
+   public void setMaxSession(final Integer value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setMaxSession(" + value + ")");
+      }
+
+      _maxSession = value;
+   }
+
+   /**
+    * Get the transaction timeout
+    * @return The value
+    */
+   public Integer getTransactionTimeout()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getTransactionTimeout()");
+      }
+
+      return _transactionTimeout;
+   }
+
+   /**
+    * Set the transaction timeout
+    * @param value The value
+    */
+   public void setTransactionTimeout(final Integer value)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setTransactionTimeout(" + value + ")");
+      }
+
+      _transactionTimeout = value;
+   }
+
+   /**
+    * Get the prefetch low
+    * @return The value
+    */
+   public Integer getPrefetchLow()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPrefetchLow()");
+      }
+
+      return _prefetchLow;
+   }
+
+   /**
+    * Set the prefetch low
+    * @param value The value
+    */
+   public void setPrefetchLow(final Integer prefetchLow)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPrefetchLow(" + prefetchLow + ")");
+      }
+
+      this._prefetchLow = prefetchLow;
+   }
+
+   /**
+    * Get the prefetch high
+    * @return The value
+    */
+   public Integer getPrefetchHigh()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getPrefetchHigh()");
+      }
+
+      return _prefetchHigh;
+   }
+
+   /**
+    * Set the prefetch high
+    * @param value The value
+    */
+   public void setPrefetchHigh(final Integer prefetchHigh)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setPrefetchHigh(" + prefetchHigh + ")");
+      }
+
+      this._prefetchHigh = prefetchHigh;
+   }
+
+   public int getSetupAttempts()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupAttempts()");
+      }
+
+      if (_setupAttempts == null)
+      {
+         return _ra.getSetupAttempts();
+      }
+      else
+      {
+         return _setupAttempts;
+      }
+   }
+
+   public void setSetupAttempts(int setupAttempts)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupAttempts(" + setupAttempts + ")");
+      }
+
+      this._setupAttempts = setupAttempts;
+   }
+
+   public long getSetupInterval()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("getSetupInterval()");
+      }
+
+      if (_setupInterval == null)
+      {
+         return _ra.getSetupInterval();
+      }
+      else
+      {
+         return _setupInterval;
+      }
+   }
+
+   public void setSetupInterval(long setupInterval)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setSetupInterval(" + setupInterval + ")");
+      }
+
+      this._setupInterval = setupInterval;
+   }
+
+   /**
+    * Validate
+    * @exception InvalidPropertyException Thrown if a validation exception occurs
+    */
+   public void validate() throws InvalidPropertyException
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("validate()");
+      }
+
+      if (_destination == null || _destination.trim().equals(""))
+      {
+         throw new InvalidPropertyException("Destination is mandatory");
+      }
+   }
+
+   /**
+    * Get a string representation
+    * @return The value
+    */
+   @Override
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(QpidActivationSpec.class.getName()).append('(');
+      buffer.append("ra=").append(_ra);
+      buffer.append(" destination=").append(_destination);
+      buffer.append(" destinationType=").append(_destinationType);
+      if (_messageSelector != null)
+      {
+         buffer.append(" selector=").append(_messageSelector);
+      }
+      buffer.append(" ack=").append(getAcknowledgeMode());
+      buffer.append(" durable=").append(_subscriptionDurability);
+      buffer.append(" clientID=").append(getClientId());
+      if (_subscriptionName != null)
+      {
+         buffer.append(" subscription=").append(_subscriptionName);
+      }
+      buffer.append(" user=").append(getUserName());
+      if (getPassword() != null)
+      {
+         buffer.append(" password=").append("****");
+      }
+      buffer.append(" maxSession=").append(_maxSession);
+      if (_prefetchLow != null)
+      {
+         buffer.append(" prefetchLow=").append(_prefetchLow);
+      }
+      if (_prefetchHigh != null)
+      {
+         buffer.append(" prefetchHigh=").append(_prefetchHigh);
+      }
+      buffer.append(')');
+      return buffer.toString();
+   }
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The message handler
+ *
+ */
+public class QpidMessageHandler implements MessageListener
+{
+   /**
+    * The logger
+    */
+   private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
+
+   /**
+    * The session
+    */
+   private final Session _session;
+
+   private MessageConsumer _consumer;
+
+   /**
+    * The endpoint
+    */
+   private MessageEndpoint _endpoint;
+
+   private final QpidActivation _activation;
+
+   private boolean _useLocalTx;
+
+   private boolean _transacted;
+
+   private final TransactionManager _tm;
+
+   public QpidMessageHandler(final QpidActivation activation,
+                                final TransactionManager tm,
+                                final Session session)
+   {
+      this._activation = activation;
+      this._session = session;
+      this._tm = tm;
+   }
+
+   public void setup() throws Exception
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("setup()");
+      }
+
+      QpidActivationSpec spec = _activation.getActivationSpec();
+      String selector = spec.getMessageSelector();
+
+      // Create the message consumer
+      if (_activation.isTopic())
+      {
+         final Topic topic = (Topic) _activation.getDestination();
+         final String subscriptionName = spec.getSubscriptionName();
+         if (spec.isSubscriptionDurable())
+            _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+         else
+            _consumer = _session.createConsumer(topic, selector) ;
+      }
+      else
+      {
+         final Queue queue = (Queue) _activation.getDestination();
+         _consumer = _session.createConsumer(queue, selector);
+      }
+
+      // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
+      MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
+      _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
+      _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
+      if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+      {
+         final XAResource xaResource = ((XASession)_session).getXAResource() ;
+         _endpoint = endpointFactory.createEndpoint(xaResource);
+      }
+      else
+      {
+         _endpoint = endpointFactory.createEndpoint(null);
+      }
+      _consumer.setMessageListener(this);
+   }
+
+   /**
+    * Stop the handler
+    */
+   public void teardown()
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("teardown()");
+      }
+
+      try
+      {
+         if (_endpoint != null)
+         {
+            _endpoint.release();
+            _endpoint = null;
+         }
+      }
+      catch (Throwable t)
+      {
+         _log.debug("Error releasing endpoint " + _endpoint, t);
+      }
+   }
+
+   public void onMessage(final Message message)
+   {
+      if (_log.isTraceEnabled())
+      {
+         _log.trace("onMessage(" + Util.asString(message) + ")");
+      }
+
+      boolean beforeDelivery = false;
+
+      try
+      {
+         if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+         {
+            _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+         }
+
+         _endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
+         beforeDelivery = true;
+
+         if(_transacted)
+         {
+             message.acknowledge();
+         }
+
+         ((MessageListener)_endpoint).onMessage(message);
+
+         if (_transacted && (_tm.getTransaction() != null))
+         {
+            final int status = _tm.getStatus() ;
+            final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
+               || status == Status.STATUS_ROLLING_BACK
+               || status == Status.STATUS_ROLLEDBACK;
+            if (rollback)
+            {
+               _session.recover() ;
+            }
+         }
+         else
+         {
+            message.acknowledge();
+         }
+
+         try
+         {
+            _endpoint.afterDelivery();
+         }
+         catch (ResourceException e)
+         {
+            _log.warn("Unable to call after delivery", e);
+            return;
+         }
+         if (_useLocalTx)
+         {
+            _session.commit();
+         }
+      }
+      catch (Throwable e)
+      {
+         _log.error("Failed to deliver message", e);
+         // we need to call before/afterDelivery as a pair
+         if (beforeDelivery)
+         {
+            try
+            {
+               _endpoint.afterDelivery();
+            }
+            catch (ResourceException e1)
+            {
+               _log.warn("Unable to call after delivery", e);
+            }
+         }
+         if (_useLocalTx || !_activation.isDeliveryTransacted())
+         {
+            try
+            {
+               _session.rollback();
+            }
+            catch (JMSException e1)
+            {
+               _log.warn("Unable to roll local transaction back", e1);
+            }
+         }
+         else
+         {
+            try
+            {
+               _session.recover() ;
+            }
+            catch (JMSException e1)
+            {
+               _log.warn("Unable to recover XA transaction", e1);
+            }
+         }
+      }
+
+   }
+
+}

Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.ra.tm;
+
+import java.util.Set;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.gbean.AbstractName;
+import org.apache.geronimo.gbean.AbstractNameQuery;
+import org.apache.geronimo.kernel.Kernel;
+import org.apache.geronimo.kernel.KernelRegistry;
+
+public class GeronimoTransactionManagerLocator
+{
+
+    public GeronimoTransactionManagerLocator()
+    {
+    }
+
+    public TransactionManager getTransactionManager()
+    {
+        try
+        {
+            Kernel kernel = KernelRegistry.getSingleKernel();
+            AbstractNameQuery query = new AbstractNameQuery(TransactionManager.class.getName ());
+            Set<AbstractName> names = kernel.listGBeans(query);
+
+            if (names.size() != 1)
+            {
+                throw new IllegalStateException("Expected one transaction manager, not " + names.size());
+            }
+
+            AbstractName name = names.iterator().next();
+            TransactionManager transMg = (TransactionManager) kernel.getGBean(name);
+            return (TransactionManager)transMg;
+
+        }
+        catch(Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org