You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/12/18 06:09:10 UTC
svn commit: r1220336 [7/8] - in /qpid/trunk/qpid/java: ./
client/src/main/java/org/apache/qpid/client/ jca/ jca/example/
jca/example/conf/ jca/example/src/ jca/example/src/main/
jca/example/src/main/java/ jca/example/src/main/java/org/ jca/example/src/...
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,820 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.ra.inflow.QpidActivation;
+import org.apache.qpid.ra.inflow.QpidActivationSpec;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The resource adapter for Qpid
+ *
+ */
+public class QpidResourceAdapter implements ResourceAdapter, Serializable
+{
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2446231446818098726L;
+
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidResourceAdapter.class);
+
+ /**
+ * The bootstrap context
+ */
+ private BootstrapContext _ctx;
+
+ /**
+ * The resource adapter properties
+ */
+ private final QpidRAProperties _raProperties;
+
+ /**
+ * Have the factory been configured
+ */
+ private final AtomicBoolean _configured;
+
+ /**
+ * The activations by activation spec
+ */
+ private final Map<ActivationSpec, QpidActivation> _activations;
+
+ private AMQConnectionFactory _defaultAMQConnectionFactory;
+
+ private TransactionManager _tm;
+
+ /**
+ * Constructor
+ */
+ public QpidResourceAdapter()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor()");
+ }
+
+ _raProperties = new QpidRAProperties();
+ _configured = new AtomicBoolean(false);
+ _activations = new ConcurrentHashMap<ActivationSpec, QpidActivation>();
+ }
+
+ public TransactionManager getTM()
+ {
+ return _tm;
+ }
+
+ /**
+ * Endpoint activation
+ *
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec) throws ResourceException
+ {
+ if (!_configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (QpidRAException e)
+ {
+ throw new ResourceException("Unable to create activation", e);
+ }
+ }
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
+ }
+
+ QpidActivation activation = new QpidActivation(this, endpointFactory, (QpidActivationSpec)spec);
+ _activations.put(spec, activation);
+ activation.start();
+ }
+
+ /**
+ * Endpoint deactivation
+ *
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ */
+ public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec spec)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
+ }
+
+ QpidActivation activation = _activations.remove(spec);
+ if (activation != null)
+ {
+ activation.stop();
+ }
+ }
+
+ /**
+ * Get XA resources
+ *
+ * @param specs The activation specs
+ * @return The XA resources
+ * @throws ResourceException Thrown if an error occurs or unsupported
+ */
+ public XAResource[] getXAResources(final ActivationSpec[] specs) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getXAResources(" + specs + ")");
+ }
+
+ return null;
+ }
+
+ /**
+ * Start
+ *
+ * @param ctx The bootstrap context
+ * @throws ResourceAdapterInternalException
+ * Thrown if an error occurs
+ */
+ public void start(final BootstrapContext ctx) throws ResourceAdapterInternalException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start(" + ctx + ")");
+ }
+
+ locateTM();
+
+ this._ctx = ctx;
+
+ _log.info("Qpid resource adapter started");
+ }
+
+ /**
+ * Stop
+ */
+ public void stop()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ for (Map.Entry<ActivationSpec, QpidActivation> entry : _activations.entrySet())
+ {
+ try
+ {
+ entry.getValue().stop();
+ }
+ catch (Exception ignored)
+ {
+ _log.debug("Ignored", ignored);
+ }
+ }
+
+ _activations.clear();
+
+ _log.info("Qpid resource adapter stopped");
+ }
+
+ /**
+ * Get the user name
+ *
+ * @return The value
+ */
+ public String getDefaultUserName()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUserName()");
+ }
+
+ return _raProperties.getUserName();
+ }
+
+ /**
+ * Set the user name
+ *
+ * @param userName The value
+ */
+ public void setDefaultUserName(final String userName)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUserName(" + userName + ")");
+ }
+
+ _raProperties.setUserName(userName);
+ }
+
+ /**
+ * Get the password
+ *
+ * @return The value
+ */
+ public String getDefaultPassword()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPassword()");
+ }
+
+ return _raProperties.getPassword();
+ }
+
+ /**
+ * Set the password
+ *
+ * @param password The value
+ */
+ public void setDefaultPassword(final String password)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPassword(****)");
+ }
+
+ _raProperties.setPassword(password);
+ }
+
+ /**
+ * Get the client ID
+ *
+ * @return The value
+ */
+ public String getClientId()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getClientID()");
+ }
+
+ return _raProperties.getClientId();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param clientID The client id
+ */
+ public void setClientId(final String clientID)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setClientID(" + clientID + ")");
+ }
+
+ _raProperties.setClientId(clientID);
+ }
+
+ /**
+ * Get the host
+ *
+ * @return The value
+ */
+ public String getHost()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getHost()");
+ }
+
+ return _raProperties.getHost();
+ }
+
+ /**
+ * Set the host
+ *
+ * @param host The host
+ */
+ public void setHost(final String host)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setHost(" + host + ")");
+ }
+
+ _raProperties.setHost(host);
+ }
+
+ /**
+ * Get the port
+ *
+ * @return The value
+ */
+ public Integer getPort()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPort()");
+ }
+
+ return _raProperties.getPort();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param port The port
+ */
+ public void setPort(final Integer port)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPort(" + port + ")");
+ }
+
+ _raProperties.setPort(port);
+ }
+
+ /**
+ * Get the connection url
+ *
+ * @return The value
+ */
+ public String getPath()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPath()");
+ }
+
+ return _raProperties.getPath();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param path The path
+ */
+ public void setPath(final String path)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPath(" + path + ")");
+ }
+
+ _raProperties.setPath(path);
+ }
+
+ /**
+ * Get the connection url
+ *
+ * @return The value
+ */
+ public String getConnectionURL()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getConnectionURL()");
+ }
+
+ return _raProperties.getConnectionURL();
+ }
+
+ /**
+ * Set the client ID
+ *
+ * @param connectionURL The connection url
+ */
+ public void setConnectionURL(final String connectionURL)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setConnectionURL(" + connectionURL + ")");
+ }
+
+ _raProperties.setConnectionURL(connectionURL);
+ }
+
+ /**
+ * Get the transaction manager locator class
+ *
+ * @return The value
+ */
+ public String getTransactionManagerLocatorClass()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorClass()");
+ }
+
+ return _raProperties.getTransactionManagerLocatorClass();
+ }
+
+ /**
+ * Set the transaction manager locator class
+ *
+ * @param locator The transaction manager locator class
+ */
+ public void setTransactionManagerLocatorClass(final String locator)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorClass(" + locator + ")");
+ }
+
+ _raProperties.setTransactionManagerLocatorClass(locator);
+ }
+
+ /**
+ * Get the transaction manager locator method
+ *
+ * @return The value
+ */
+ public String getTransactionManagerLocatorMethod()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionManagerLocatorMethod()");
+ }
+
+ return _raProperties.getTransactionManagerLocatorMethod();
+ }
+
+ /**
+ * Set the transaction manager locator method
+ *
+ * @param method The transaction manager locator method
+ */
+ public void setTransactionManagerLocatorMethod(final String method)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionManagerLocatorMethod(" + method + ")");
+ }
+
+ _raProperties.setTransactionManagerLocatorMethod(method);
+ }
+
+ /**
+ * Get the use XA flag
+ *
+ * @return The value
+ */
+ public Boolean getUseLocalTx()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getUseLocalTx()");
+ }
+
+ return _raProperties.getUseLocalTx();
+ }
+
+ /**
+ * Set the use XA flag
+ *
+ * @param localTx The value
+ */
+ public void setUseLocalTx(final Boolean localTx)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setUseLocalTx(" + localTx + ")");
+ }
+
+ _raProperties.setUseLocalTx(localTx);
+ }
+
+ public Integer getSetupAttempts()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupAttempts()");
+ }
+ return _raProperties.getSetupAttempts();
+ }
+
+ public void setSetupAttempts(Integer setupAttempts)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+ _raProperties.setSetupAttempts(setupAttempts);
+ }
+
+ public Long getSetupInterval()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupInterval()");
+ }
+ return _raProperties.getSetupInterval();
+ }
+
+ public void setSetupInterval(Long interval)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupInterval(" + interval + ")");
+ }
+ _raProperties.setSetupInterval(interval);
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * @param obj Object with which to compare
+ * @return True if this object is the same as the obj argument; false otherwise.
+ */
+ public boolean equals(final Object obj)
+ {
+ if (obj == null)
+ {
+ return false;
+ }
+
+ if (obj instanceof QpidResourceAdapter)
+ {
+ return _raProperties.equals(((QpidResourceAdapter)obj).getProperties());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Return the hash code for the object
+ *
+ * @return The hash code
+ */
+ public int hashCode()
+ {
+ return _raProperties.hashCode();
+ }
+
+ /**
+ * Get the work manager
+ *
+ * @return The manager
+ */
+ public WorkManager getWorkManager()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getWorkManager()");
+ }
+
+ if (_ctx == null)
+ {
+ return null;
+ }
+
+ return _ctx.getWorkManager();
+ }
+
+ public XASession createXASession(final XAConnectionImpl connection)
+ throws Exception
+ {
+ final XASession result = connection.createXASession() ;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Using session " + Util.asString(result));
+ }
+ return result ;
+ }
+
+ public Session createSession(final AMQConnection connection,
+ final int ackMode,
+ final boolean useLocalTx,
+ final Integer prefetchLow,
+ final Integer prefetchHigh) throws Exception
+ {
+ Session result;
+
+ if (prefetchLow == null)
+ {
+ result = connection.createSession(useLocalTx, ackMode) ;
+ }
+ else if (prefetchHigh == null)
+ {
+ result = connection.createSession(useLocalTx, ackMode, prefetchLow) ;
+ }
+ else
+ {
+ result = connection.createSession(useLocalTx, ackMode, prefetchHigh, prefetchLow) ;
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Using session " + Util.asString(result));
+ }
+
+ return result;
+
+ }
+
+ /**
+ * Get the resource adapter properties
+ *
+ * @return The properties
+ */
+ protected QpidRAProperties getProperties()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getProperties()");
+ }
+
+ return _raProperties;
+ }
+
+ /**
+ * Setup the factory
+ */
+ protected void setup() throws QpidRAException
+ {
+ _defaultAMQConnectionFactory = createAMQConnectionFactory(_raProperties);
+ }
+
+
+ public AMQConnectionFactory getDefaultAMQConnectionFactory() throws ResourceException
+ {
+ if (!_configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (QpidRAException e)
+ {
+ throw new ResourceException("Unable to create activation", e);
+ }
+ }
+ return _defaultAMQConnectionFactory;
+ }
+
+ public AMQConnectionFactory createAMQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+ throws QpidRAException
+ {
+ try
+ {
+ return createFactory(overrideProperties);
+ }
+ catch (final URLSyntaxException urlse)
+ {
+ throw new QpidRAException("Unexpected exception creating connection factory", urlse) ;
+ }
+ }
+
+ public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
+ final Map<String, Object> overrideConnectionParams)
+ {
+ Map<String, Object> map = new HashMap<String, Object>();
+
+ if(connectionParams != null)
+ {
+ map.putAll(connectionParams);
+ }
+ if(overrideConnectionParams != null)
+ {
+ for (Map.Entry<String, Object> stringObjectEntry : overrideConnectionParams.entrySet())
+ {
+ map.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
+ }
+ }
+ return map;
+ }
+
+ private void locateTM()
+ {
+ if(_raProperties.getTransactionManagerLocatorClass() != null && _raProperties.getTransactionManagerLocatorMethod() != null)
+ {
+
+ String locatorClasses[] = _raProperties.getTransactionManagerLocatorClass().split(";");
+ String locatorMethods[] = _raProperties.getTransactionManagerLocatorMethod().split(";");
+
+ for (int i = 0 ; i < locatorClasses.length; i++)
+ {
+ _tm = Util.locateTM(locatorClasses[i], locatorMethods[i]);
+ if (_tm != null)
+ {
+ break;
+ }
+ }
+
+
+ }
+
+ if (_tm == null)
+ {
+ _log.warn("It wasn't possible to lookup a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod");
+ _log.warn("Qpid Resource Adapter won't be able to set and verify transaction timeouts in certain cases.");
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("TM located = " + _tm);
+ }
+ }
+ }
+
+
+ private AMQConnectionFactory createFactory(final ConnectionFactoryProperties overrideProperties)
+ throws URLSyntaxException, QpidRAException
+ {
+ final String overrideURL = overrideProperties.getConnectionURL() ;
+ final String url = overrideURL != null ? overrideURL : _raProperties.getConnectionURL() ;
+
+ final String overrideClientID = overrideProperties.getClientId() ;
+ final String clientID = (overrideClientID != null ? overrideClientID : _raProperties.getClientId()) ;
+
+ final String overrideDefaultPassword = overrideProperties.getPassword() ;
+ final String defaultPassword = (overrideDefaultPassword != null ? overrideDefaultPassword : _raProperties.getPassword()) ;
+
+ final String overrideDefaultUsername = overrideProperties.getUserName() ;
+ final String defaultUsername = (overrideDefaultUsername != null ? overrideDefaultUsername : _raProperties.getUserName()) ;
+
+ final String overrideHost = overrideProperties.getHost() ;
+ final String host = (overrideHost != null ? overrideHost : _raProperties.getHost()) ;
+
+ final Integer overridePort = overrideProperties.getPort() ;
+ final Integer port = (overridePort != null ? overridePort : _raProperties.getPort()) ;
+
+ final String overridePath = overrideProperties.getPath() ;
+ final String path = (overridePath != null ? overridePath : _raProperties.getPath()) ;
+
+ final AMQConnectionFactory cf ;
+
+ if (url != null)
+ {
+ cf = new AMQConnectionFactory(url) ;
+
+ if (clientID != null)
+ {
+ cf.getConnectionURL().setClientName(clientID) ;
+ }
+ }
+ else
+ {
+ // create a URL to force the connection details
+ if ((host == null) || (port == null) || (path == null))
+ {
+ throw new QpidRAException("Configuration requires host/port/path if connectionURL is not specified") ;
+ }
+ final String username = (defaultUsername != null ? defaultUsername : "") ;
+ final String password = (defaultPassword != null ? defaultPassword : "") ;
+ final String client = (clientID != null ? clientID : "") ;
+
+ final String newurl = AMQConnectionURL.AMQ_PROTOCOL + "://" + username +":" + password + "@" + client + "/" + path + '?' + AMQConnectionURL.OPTIONS_BROKERLIST + "='tcp://" + host + ':' + port + '\'' ;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Initialising connectionURL to " + newurl) ;
+ }
+
+ cf = new AMQConnectionFactory(newurl) ;
+ }
+
+ return cf ;
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+
+import javax.naming.Context;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.transaction.TransactionManager;
+
+import org.apache.qpid.ra.admin.QpidQueue;
+import org.apache.qpid.ra.admin.QpidTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Various utility functions
+ *
+ */
+public class Util
+{
+
+ private static final Logger _log = LoggerFactory.getLogger(Util.class);
+
+ /**
+ * Compare two strings.
+ * @param me First value
+ * @param you Second value
+ * @return True if object equals else false.
+ */
+ public static boolean compare(final String me, final String you)
+ {
+ // If both null or intern equals
+ if (me == you)
+ {
+ return true;
+ }
+
+ // if me null and you are not
+ if (me == null && you != null)
+ {
+ return false;
+ }
+
+ // me will not be null, test for equality
+ return me.equals(you);
+ }
+
+ /**
+ * Lookup an object in the default initial context
+ * @param context The context to use
+ * @param name the name to lookup
+ * @param clazz the expected type
+ * @return the object
+ * @throws Exception for any error
+ */
+ public static <T> T lookup(final Context context, final String name, final Class<T> clazz) throws Exception
+ {
+ Object object = context.lookup(name);
+
+ if (object instanceof Reference)
+ {
+
+ Reference ref = (Reference) object;
+ String addressContent = null;
+
+ if (ref.getClassName().equals(QpidQueue.class.getName()))
+ {
+ RefAddr addr = ref.get(QpidQueue.class.getName());
+ addressContent = (String) addr.getContent();
+
+ if (addr != null)
+ {
+ return (T)new QpidQueue(addressContent);
+ }
+ }
+
+ if (ref.getClassName().equals(QpidTopic.class.getName()))
+ {
+ RefAddr addr = ref.get(QpidTopic.class.getName());
+ addressContent = (String) addr.getContent();
+
+ if (addr != null)
+ {
+ return (T)new QpidTopic(addressContent);
+ }
+ }
+ }
+
+ return clazz.cast(object);
+
+ }
+
+ /** The Resource adapter can't depend on any provider's specific library. Because of that we use reflection to locate the
+ * transaction manager during startup.
+ *
+ *
+ * TODO: We should use a proper SPI instead of reflection
+ * We would need to define a proper SPI package for this.
+ **/
+ public static TransactionManager locateTM(final String locatorClass, final String locatorMethod)
+ {
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> aClass = loader.loadClass(locatorClass);
+ Object o = aClass.newInstance();
+ Method m = aClass.getMethod(locatorMethod);
+ return (TransactionManager)m.invoke(o);
+ }
+ catch (Throwable e)
+ {
+ _log.debug(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Serialize the object into a byte array.
+ * @param serializable The serializable object
+ * @return The generated byte array
+ * @throws IOException For errors during serialization.
+ */
+ public static byte[] serialize(final Serializable serializable)
+ throws IOException
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream() ;
+ final ObjectOutputStream oos = new ObjectOutputStream(baos) ;
+ oos.writeObject(serializable) ;
+ oos.close() ;
+ return baos.toByteArray() ;
+ }
+
+ /**
+ * Deserialize the byte array into an object.
+ * @param data The serialized object as a byte array
+ * @return The serializable object.
+ * @throws IOException For errors during deserialization
+ * @throws ClassNotFoundException If the deserialized class cannot be found.
+ */
+ public static Object deserialize(final byte[] data)
+ throws IOException, ClassNotFoundException
+ {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(data) ;
+ final ObjectInputStream ois = new ObjectInputStream(bais) ;
+ return ois.readObject() ;
+ }
+
+ /**
+ * Return a string identification for the specified object.
+ * @param object The object value.
+ * @return The string identification.
+ */
+ public static String asString(final Object object)
+ {
+ return (object == null ? "null" : object.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(object))) ;
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/AdminObjectFactory.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import java.util.Hashtable;
+
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.spi.ObjectFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdminObjectFactory implements ObjectFactory
+{
+ private static final Logger _log = LoggerFactory.getLogger(AdminObjectFactory.class);
+
+ @Override
+ public Object getObjectInstance(Object object, Name name, Context context, Hashtable<?, ?> env) throws Exception
+ {
+
+ Object instance = null;
+
+ if (object instanceof Reference)
+ {
+ Reference ref = (Reference) object;
+ String bindingURLString;
+
+ if (ref.getClassName().equals(QpidQueue.class.getName()))
+ {
+ RefAddr addr = ref.get(QpidQueue.class.getName());
+ bindingURLString = (String) addr.getContent();
+
+ if (addr != null)
+ {
+ return new QpidQueue(bindingURLString);
+ }
+
+ }
+
+ if (ref.getClassName().equals(QpidTopic.class.getName()))
+ {
+ RefAddr addr = ref.get(QpidTopic.class.getName());
+ bindingURLString = (String) addr.getContent();
+
+ if (addr != null)
+ {
+ return new QpidTopic(bindingURLString);
+ }
+ }
+ }
+ return instance;
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidBindingURL.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import java.net.URISyntaxException;
+
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class QpidBindingURL extends AMQBindingURL {
+
+ private String _url;
+
+ public QpidBindingURL(String url) throws URISyntaxException {
+ super(url);
+
+ if (!url.contains(BindingURL.OPTION_ROUTING_KEY) || getRoutingKey() == null) {
+ setOption(BindingURL.OPTION_ROUTING_KEY, null);
+ }
+
+ this._url = url;
+ }
+
+ @Override
+ public String getURL() {
+ return _url;
+ }
+
+ @Override
+ public String toString() {
+ return _url;
+ }
+
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidConnectionFactoryProxy.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra.admin;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.spi.ObjectFactory;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class QpidConnectionFactoryProxy implements Externalizable, Referenceable, ConnectionFactory, Serializable
+{
+ private static final Logger _log = LoggerFactory.getLogger(QpidDestinationProxy.class);
+
+ private String _connectionURL;
+
+ private ConnectionFactory _delegate;
+
+ /**
+ * This constructor should not only be used be de-serialisation code. Create
+ * original object with the other constructor.
+ */
+ public QpidConnectionFactoryProxy()
+ {
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ Reference ref = (Reference) in.readObject();
+
+ try
+ {
+ _delegate = (ConnectionFactory) dereference(ref);
+
+ } catch (Exception e)
+ {
+ _log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e);
+ throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage());
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ if (_delegate == null)
+ {
+ _log.error("Null Destination ");
+ throw new IOException("Null ConnectionFactory!");
+ }
+
+ try
+ {
+ out.writeObject(((Referenceable) _delegate).getReference());
+ }
+ catch (NamingException e)
+ {
+ _log.error("Failed to dereference ConnectionFactory " + e.getMessage(), e);
+ throw new IOException("Failed to dereference ConnectionFactory: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public Reference getReference() throws NamingException
+ {
+ try
+ {
+ _delegate = new AMQConnectionFactory(getConnectionURL());
+ /*
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory();
+ mcf.setResourceAdapter(ra);
+ mcf.setConnectionURL(getConnectionURL());
+ delegate = new QpidRAConnectionFactoryImpl(mcf, null);
+ */
+ return ((Referenceable) _delegate).getReference();
+ }
+ catch(Exception e)
+ {
+ throw new NamingException(e.getMessage());
+ }
+ }
+ private Object dereference(Reference ref) throws Exception
+ {
+ ObjectFactory objFactory = (ObjectFactory) Class.forName(
+ ref.getFactoryClassName()).newInstance();
+ return objFactory.getObjectInstance(ref, null, null, null);
+ }
+
+ public void setConnectionURL(final String connectionURL)
+ {
+ this._connectionURL = connectionURL;
+ }
+ public String getConnectionURL()
+ {
+ return this._connectionURL;
+ }
+
+ /**
+ * Create a connection
+ * @return The connection
+ * @exception JMSException Thrown if the operation fails
+ */
+ public Connection createConnection() throws JMSException
+ {
+ return _delegate.createConnection();
+ }
+
+ /**
+ * Create a connection
+ * @param userName The user name
+ * @param password The password
+ * @return The connection
+ * @exception JMSException Thrown if the operation fails
+ */
+ public Connection createConnection(final String userName, final String password) throws JMSException
+ {
+ return _delegate.createConnection(userName, password);
+ }
+
+}
+
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidDestinationProxy.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.ra.admin;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+import javax.jms.Destination;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.spi.ObjectFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The QpidDestinationProxy provides for allowing an administrator/developer to
+ * create and bind QPID destinations into a JNDI tree. AdminObjects are used as
+ * an generic integration point rather than relying on the EE server specific
+ * API's to create destinations (queues, topics). AdminObjects and associated
+ * properties are defined in the ra.xml file for a particular JCA adapter.
+ * Please see the ra.xml file for the QPID JCA resource adapter as well as the
+ * README.txt for the adapter for more details.
+ *
+ */
+public class QpidDestinationProxy implements Externalizable, Referenceable, Destination, Serializable
+{
+ private static final long serialVersionUID = -1137413782643796461L;
+
+ private static final Logger _log = LoggerFactory.getLogger(QpidDestinationProxy.class);
+
+ private static final String DEFAULT_QUEUE_TYPE = "QUEUE";
+
+ private static final String DEFAULT_TOPIC_TYPE = "TOPIC";
+
+ private String _destinationAddress;
+
+ private String _destinationType;
+
+ private Destination _delegate;
+
+ /**
+ * This constructor should not only be used be de-serialisation code. Create
+ * original object with the other constructor.
+ */
+ public QpidDestinationProxy()
+ {
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ Reference ref = (Reference) in.readObject();
+
+ try
+ {
+ _delegate = (Destination) dereference(ref);
+
+ } catch (Exception e)
+ {
+ _log.error("Failed to dereference Destination " + e.getMessage(), e);
+ throw new IOException("Failed to dereference Destination: " + e.getMessage());
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ if (_delegate == null)
+ {
+ _log.error("Null Destination ");
+ throw new IOException("Null destination!");
+ }
+
+ try
+ {
+ out.writeObject(((Referenceable) _delegate).getReference());
+ }
+ catch (NamingException e)
+ {
+ _log.error("Failed to dereference Destination " + e.getMessage(), e);
+ throw new IOException("Failed to dereference Destination: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public Reference getReference() throws NamingException
+ {
+ try
+ {
+ if(getDestinationType().equalsIgnoreCase(DEFAULT_QUEUE_TYPE))
+ {
+ _delegate = new QpidQueue(getDestinationAddress());
+ }
+ else if(getDestinationType().equalsIgnoreCase(DEFAULT_TOPIC_TYPE))
+ {
+ _delegate = new QpidTopic(getDestinationAddress());
+ }
+ else
+ {
+ throw new IllegalStateException("Unknown destination type " + getDestinationType());
+ }
+
+ return ((Referenceable) _delegate).getReference();
+
+ }
+ catch(Exception e)
+ {
+ _log.error(e.getMessage(),e);
+ throw new NamingException("Failed to create destination " + e.getMessage());
+ }
+
+ }
+
+ private Object dereference(Reference ref) throws Exception
+ {
+ ObjectFactory objFactory = (ObjectFactory) Class.forName(
+ ref.getFactoryClassName()).newInstance();
+ return objFactory.getObjectInstance(ref, null, null, null);
+ }
+
+ public void setDestinationAddress(String destinationAddress) throws Exception
+ {
+ this._destinationAddress = destinationAddress;
+ }
+
+ public String getDestinationAddress()
+ {
+ return this._destinationAddress;
+ }
+
+ public void setDestinationType(String destinationType)
+ {
+ this._destinationType = destinationType;
+ }
+
+ public String getDestinationType()
+ {
+ return this._destinationType;
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidQueue.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpid.client.AMQQueue;
+
+public class QpidQueue extends AMQQueue
+{
+ private String _url;
+
+ public QpidQueue(final String address) throws Exception
+ {
+ super(address);
+ this._url = address;
+ }
+
+ @Override
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+ AdminObjectFactory.class.getName(), null);
+ }
+
+ @Override
+ public String toURL()
+ {
+ return _url;
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/admin/QpidTopic.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.admin;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpid.client.AMQTopic;
+
+public class QpidTopic extends AMQTopic
+{
+ private String _url;
+
+ public QpidTopic(final String address) throws Exception
+ {
+ super(address);
+ this._url = address;
+ }
+
+ @Override
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+ AdminObjectFactory.class.getName(), null);
+ }
+
+ @Override
+ public String toURL()
+ {
+ return _url;
+ }
+
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivation.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,593 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.XAConnectionImpl;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ra.QpidResourceAdapter;
+import org.apache.qpid.ra.Util;
+
+/**
+ * The activation.
+ *
+ */
+public class QpidActivation implements ExceptionListener
+{
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidActivation.class);
+
+ /**
+ * The onMessage method
+ */
+ public static final Method ONMESSAGE;
+
+ /**
+ * The resource adapter
+ */
+ private final QpidResourceAdapter _ra;
+
+ /**
+ * The activation spec
+ */
+ private final QpidActivationSpec _spec;
+
+ /**
+ * The message endpoint factory
+ */
+ private final MessageEndpointFactory _endpointFactory;
+
+ /**
+ * Whether delivery is active
+ */
+ private final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
+
+ /**
+ * The destination type
+ */
+ private boolean _isTopic = false;
+
+ /**
+ * Is the delivery transacted
+ */
+ private boolean _isDeliveryTransacted;
+
+ private Destination _destination;
+
+ /**
+ * The connection
+ */
+ private Connection _connection;
+
+ private final List<QpidMessageHandler> _handlers = new ArrayList<QpidMessageHandler>();
+
+ private AMQConnectionFactory _factory;
+
+ // Whether we are in the failure recovery loop
+ private AtomicBoolean _inFailure = new AtomicBoolean(false);
+
+ static
+ {
+ try
+ {
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param ra The resource adapter
+ * @param endpointFactory The endpoint factory
+ * @param spec The activation spec
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public QpidActivation(final QpidResourceAdapter ra,
+ final MessageEndpointFactory endpointFactory,
+ final QpidActivationSpec spec) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+ }
+
+ this._ra = ra;
+ this._endpointFactory = endpointFactory;
+ this._spec = spec;
+ try
+ {
+ _isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException(e);
+ }
+ }
+
+ /**
+ * Get the activation spec
+ *
+ * @return The value
+ */
+ public QpidActivationSpec getActivationSpec()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getActivationSpec()");
+ }
+
+ return _spec;
+ }
+
+ /**
+ * Get the message endpoint factory
+ *
+ * @return The value
+ */
+ public MessageEndpointFactory getMessageEndpointFactory()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageEndpointFactory()");
+ }
+
+ return _endpointFactory;
+ }
+
+ /**
+ * Get whether delivery is transacted
+ *
+ * @return The value
+ */
+ public boolean isDeliveryTransacted()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isDeliveryTransacted()");
+ }
+
+ return _isDeliveryTransacted;
+ }
+
+ /**
+ * Get the work manager
+ *
+ * @return The value
+ */
+ public WorkManager getWorkManager()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getWorkManager()");
+ }
+
+ return _ra.getWorkManager();
+ }
+
+ /**
+ * Is the destination a topic
+ *
+ * @return The value
+ */
+ public boolean isTopic()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isTopic()");
+ }
+
+ return _isTopic;
+ }
+
+ /**
+ * Start the activation
+ *
+ * @throws ResourceException Thrown if an error occurs
+ */
+ public void start() throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("start()");
+ }
+ _deliveryActive.set(true);
+ _ra.getWorkManager().scheduleWork(new SetupActivation());
+ }
+
+ /**
+ * Stop the activation
+ */
+ public void stop()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("stop()");
+ }
+
+ _deliveryActive.set(false);
+ teardown();
+ }
+
+ /**
+ * Setup the activation
+ *
+ * @throws Exception Thrown if an error occurs
+ */
+ protected synchronized void setup() throws Exception
+ {
+ _log.debug("Setting up " + _spec);
+ setupCF();
+
+ setupDestination();
+ final AMQConnection amqConnection ;
+ final boolean useLocalTx = _spec.isUseLocalTx() ;
+ final boolean isXA = _isDeliveryTransacted && !useLocalTx ;
+
+ if (isXA)
+ {
+ amqConnection = (XAConnectionImpl)_factory.createXAConnection() ;
+ }
+ else
+ {
+ amqConnection = (AMQConnection)_factory.createConnection() ;
+ }
+
+ amqConnection.setExceptionListener(this) ;
+
+ for (int i = 0; i < _spec.getMaxSession(); i++)
+ {
+ Session session = null;
+
+ try
+ {
+ if (isXA)
+ {
+ session = _ra.createXASession((XAConnectionImpl)amqConnection) ;
+ }
+ else
+ {
+ session = _ra.createSession((AMQConnection)amqConnection,
+ _spec.getAcknowledgeModeInt(),
+ useLocalTx,
+ _spec.getPrefetchLow(),
+ _spec.getPrefetchHigh());
+ }
+
+ _log.debug("Using session " + Util.asString(session));
+ QpidMessageHandler handler = new QpidMessageHandler(this, _ra.getTM(), session);
+ handler.setup();
+ _handlers.add(handler);
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ amqConnection.close() ;
+ }
+ catch (Exception e2)
+ {
+ _log.trace("Ignored error closing connection", e2);
+ }
+
+ throw e;
+ }
+ }
+ amqConnection.start() ;
+ this._connection = amqConnection ;
+
+ _log.debug("Setup complete " + this);
+ }
+
+ /**
+ * Teardown the activation
+ */
+ protected synchronized void teardown()
+ {
+ _log.debug("Tearing down " + _spec);
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error stopping connection " + Util.asString(_connection), t);
+ }
+
+ for (QpidMessageHandler handler : _handlers)
+ {
+ handler.teardown();
+ }
+
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error closing connection " + Util.asString(_connection), t);
+ }
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = null;
+ }
+ _log.debug("Tearing down complete " + this);
+ }
+
+ protected void setupCF() throws Exception
+ {
+ if (_spec.isHasBeenUpdated())
+ {
+ _factory = _ra.createAMQConnectionFactory(_spec);
+ }
+ else
+ {
+ _factory = _ra.getDefaultAMQConnectionFactory();
+ }
+ }
+
+ public Destination getDestination()
+ {
+ return _destination;
+ }
+
+ protected void setupDestination() throws Exception
+ {
+
+ String destinationName = _spec.getDestination();
+ String destinationTypeString = _spec.getDestinationType();
+
+ if (_spec.isUseJNDI())
+ {
+ Context ctx = new InitialContext();
+ _log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setupDestination(" + ctx + ")");
+ }
+
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+
+ Class<? extends Destination> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ destinationType = Topic.class;
+ _isTopic = true;
+ }
+ else
+ {
+ destinationType = Queue.class;
+ }
+
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ destinationType.getName());
+ _destination = Util.lookup(ctx, destinationName, destinationType);
+ //_destination = (Destination)ctx.lookup(destinationName);
+
+ }
+ else
+ {
+ _log.debug("Destination type not defined");
+ _log.debug("Retrieving destination " + destinationName +
+ " of type " +
+ Destination.class.getName());
+
+ _destination = Util.lookup(ctx, destinationName, AMQDestination.class);
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+ else
+ {
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+ {
+ _log.debug("Destination type defined as " + destinationTypeString);
+ final boolean match ;
+ if (Topic.class.getName().equals(destinationTypeString))
+ {
+ match = (_destination instanceof Topic) ;
+ _isTopic = true;
+ }
+ else
+ {
+ match = (_destination instanceof Queue) ;
+ }
+ if (!match)
+ {
+ throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
+ }
+ }
+ else
+ {
+ _isTopic = !(_destination instanceof Queue) ;
+ }
+ }
+
+ _log.debug("Got destination " + _destination + " from " + destinationName);
+ }
+
+ /**
+ * Get a string representation
+ *
+ * @return The value
+ */
+ @Override
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(QpidActivation.class.getName()).append('(');
+ buffer.append("spec=").append(_spec.getClass().getName());
+ buffer.append(" mepf=").append(_endpointFactory.getClass().getName());
+ buffer.append(" active=").append(_deliveryActive.get());
+ if (_spec.getDestination() != null)
+ {
+ buffer.append(" destination=").append(_spec.getDestination());
+ }
+ buffer.append(" transacted=").append(_isDeliveryTransacted);
+ buffer.append(')');
+ return buffer.toString();
+ }
+
+ public void onException(final JMSException jmse)
+ {
+ handleFailure(jmse) ;
+ }
+
+ /**
+ * Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
+ */
+ public void handleFailure(Throwable failure)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+ }
+ else
+ {
+ _log.warn("Failure in Qpid activation " + _spec, failure);
+ }
+ int reconnectCount = 0;
+ int setupAttempts = _spec.getSetupAttempts();
+ long setupInterval = _spec.getSetupInterval();
+
+ // Only enter the failure loop once
+ if (_inFailure.getAndSet(true))
+ return;
+ try
+ {
+ while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
+ {
+ teardown();
+
+ try
+ {
+ Thread.sleep(setupInterval);
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted trying to reconnect " + _spec, e);
+ break;
+ }
+
+ _log.info("Attempting to reconnect " + _spec);
+ try
+ {
+ setup();
+ _log.info("Reconnected with Qpid");
+ break;
+ }
+ catch (Throwable t)
+ {
+ if(doesNotExist(failure))
+ {
+ _log.info("awaiting topic/queue creation " + getActivationSpec().getDestination());
+ }
+ else
+ {
+ _log.error("Unable to reconnect " + _spec, t);
+ }
+ }
+ ++reconnectCount;
+ }
+ }
+ finally
+ {
+ // Leaving failure recovery loop
+ _inFailure.set(false);
+ }
+ }
+
+ /**
+ * Check to see if the failure represents a missing endpoint
+ * @param failure The failure.
+ * @return true if it represents a missing endpoint, false otherwise
+ */
+ private boolean doesNotExist(final Throwable failure)
+ {
+ return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
+ }
+
+ /**
+ * Handles the setup
+ */
+ private class SetupActivation implements Work
+ {
+ public void run()
+ {
+ try
+ {
+ setup();
+ }
+ catch (Throwable t)
+ {
+ handleFailure(t);
+ }
+ }
+
+ public void release()
+ {
+ }
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidActivationSpec.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,604 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import java.io.Serializable;
+
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.InvalidPropertyException;
+import javax.resource.spi.ResourceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.ra.ConnectionFactoryProperties;
+import org.apache.qpid.ra.QpidResourceAdapter;
+
+/**
+ * The activation spec
+ * These properties are set on the MDB ActivactionProperties
+ *
+ */
+public class QpidActivationSpec extends ConnectionFactoryProperties implements ActivationSpec, Serializable
+{
+ private static final long serialVersionUID = 7379131936083146158L;
+
+ private static final int DEFAULT_MAX_SESSION = 15;
+
+ /** The logger */
+ private static final transient Logger _log = LoggerFactory.getLogger(QpidActivationSpec.class);
+
+ /** The resource adapter */
+ private QpidResourceAdapter _ra;
+
+ /** The destination */
+ private String _destination;
+
+ /** The destination type */
+ private String _destinationType;
+
+ /** The message selector */
+ private String _messageSelector;
+
+ /** The acknowledgement mode */
+ private int _acknowledgeMode;
+
+ /** The subscription durability */
+ private boolean _subscriptionDurability;
+
+ /** The subscription name */
+ private String _subscriptionName;
+
+ /** The maximum number of sessions */
+ private Integer _maxSession;
+
+ /** Transaction timeout */
+ private Integer _transactionTimeout;
+
+ /** prefetch low */
+ private Integer _prefetchLow;
+
+ /** prefetch high */
+ private Integer _prefetchHigh;
+
+ private boolean _useJNDI = true;
+
+ // undefined by default, default is specified at the RA level in QpidRAProperties
+ private Integer _setupAttempts;
+
+ // undefined by default, default is specified at the RA level in QpidRAProperties
+ private Long _setupInterval;
+
+ /**
+ * Constructor
+ */
+ public QpidActivationSpec()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("constructor()");
+ }
+
+ _acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ _maxSession = DEFAULT_MAX_SESSION;
+ _transactionTimeout = 0;
+ }
+
+ /**
+ * Get the resource adapter
+ * @return The resource adapter
+ */
+ public ResourceAdapter getResourceAdapter()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getResourceAdapter()");
+ }
+
+ return _ra;
+ }
+
+ /**
+ * @return the useJNDI
+ */
+ public boolean isUseJNDI()
+ {
+ return _useJNDI;
+ }
+
+ /**
+ * @param value the useJNDI to set
+ */
+ public void setUseJNDI(final boolean value)
+ {
+ _useJNDI = value;
+ }
+
+ /**
+ * Set the resource adapter
+ * @param ra The resource adapter
+ * @exception ResourceException Thrown if incorrect resource adapter
+ */
+ public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setResourceAdapter(" + ra + ")");
+ }
+
+ if (ra == null || !(ra instanceof QpidResourceAdapter))
+ {
+ throw new ResourceException("Resource adapter is " + ra);
+ }
+
+ this._ra = (QpidResourceAdapter)ra;
+ }
+
+ /**
+ * Get the destination
+ * @return The value
+ */
+ public String getDestination()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDestination()");
+ }
+
+ return _destination;
+ }
+
+ /**
+ * Set the destination
+ * @param value The value
+ */
+ public void setDestination(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDestination(" + value + ")");
+ }
+
+ _destination = value;
+ }
+
+ /**
+ * Get the destination type
+ * @return The value
+ */
+ public String getDestinationType()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getDestinationType()");
+ }
+
+ return _destinationType;
+ }
+
+ /**
+ * Set the destination type
+ * @param value The value
+ */
+ public void setDestinationType(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setDestinationType(" + value + ")");
+ }
+
+ _destinationType = value;
+ }
+
+ /**
+ * Get the message selector
+ * @return The value
+ */
+ public String getMessageSelector()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMessageSelector()");
+ }
+
+ return _messageSelector;
+ }
+
+ /**
+ * Set the message selector
+ * @param value The value
+ */
+ public void setMessageSelector(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setMessageSelector(" + value + ")");
+ }
+
+ _messageSelector = value;
+ }
+
+ /**
+ * Get the acknowledge mode
+ * @return The value
+ */
+ public String getAcknowledgeMode()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getAcknowledgeMode()");
+ }
+
+ if (Session.DUPS_OK_ACKNOWLEDGE == _acknowledgeMode)
+ {
+ return "Dups-ok-acknowledge";
+ }
+ else
+ {
+ return "Auto-acknowledge";
+ }
+ }
+
+ /**
+ * Set the acknowledge mode
+ * @param value The value
+ */
+ public void setAcknowledgeMode(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setAcknowledgeMode(" + value + ")");
+ }
+
+ if ("DUPS_OK_ACKNOWLEDGE".equalsIgnoreCase(value) || "Dups-ok-acknowledge".equalsIgnoreCase(value))
+ {
+ _acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
+ }
+ else if ("AUTO_ACKNOWLEDGE".equalsIgnoreCase(value) || "Auto-acknowledge".equalsIgnoreCase(value))
+ {
+ _acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported acknowledgement mode " + value);
+ }
+ }
+
+ /**
+ * @return the acknowledgement mode
+ */
+ public int getAcknowledgeModeInt()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getAcknowledgeMode()");
+ }
+
+ return _acknowledgeMode;
+ }
+
+ /**
+ * Get the subscription durability
+ * @return The value
+ */
+ public String getSubscriptionDurability()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSubscriptionDurability()");
+ }
+
+ if (_subscriptionDurability)
+ {
+ return "Durable";
+ }
+ else
+ {
+ return "NonDurable";
+ }
+ }
+
+ /**
+ * Set the subscription durability
+ * @param value The value
+ */
+ public void setSubscriptionDurability(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSubscriptionDurability(" + value + ")");
+ }
+
+ _subscriptionDurability = "Durable".equals(value);
+ }
+
+ /**
+ * Get the status of subscription durability
+ * @return The value
+ */
+ public boolean isSubscriptionDurable()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("isSubscriptionDurable()");
+ }
+
+ return _subscriptionDurability;
+ }
+
+ /**
+ * Get the subscription name
+ * @return The value
+ */
+ public String getSubscriptionName()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSubscriptionName()");
+ }
+
+ return _subscriptionName;
+ }
+
+ /**
+ * Set the subscription name
+ * @param value The value
+ */
+ public void setSubscriptionName(final String value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSubscriptionName(" + value + ")");
+ }
+
+ _subscriptionName = value;
+ }
+
+ /**
+ * Get the number of max session
+ * @return The value
+ */
+ public Integer getMaxSession()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getMaxSession()");
+ }
+
+ if (_maxSession == null)
+ {
+ return DEFAULT_MAX_SESSION;
+ }
+
+ return _maxSession;
+ }
+
+ /**
+ * Set the number of max session
+ * @param value The value
+ */
+ public void setMaxSession(final Integer value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setMaxSession(" + value + ")");
+ }
+
+ _maxSession = value;
+ }
+
+ /**
+ * Get the transaction timeout
+ * @return The value
+ */
+ public Integer getTransactionTimeout()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getTransactionTimeout()");
+ }
+
+ return _transactionTimeout;
+ }
+
+ /**
+ * Set the transaction timeout
+ * @param value The value
+ */
+ public void setTransactionTimeout(final Integer value)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setTransactionTimeout(" + value + ")");
+ }
+
+ _transactionTimeout = value;
+ }
+
+ /**
+ * Get the prefetch low
+ * @return The value
+ */
+ public Integer getPrefetchLow()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPrefetchLow()");
+ }
+
+ return _prefetchLow;
+ }
+
+ /**
+ * Set the prefetch low
+ * @param value The value
+ */
+ public void setPrefetchLow(final Integer prefetchLow)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPrefetchLow(" + prefetchLow + ")");
+ }
+
+ this._prefetchLow = prefetchLow;
+ }
+
+ /**
+ * Get the prefetch high
+ * @return The value
+ */
+ public Integer getPrefetchHigh()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getPrefetchHigh()");
+ }
+
+ return _prefetchHigh;
+ }
+
+ /**
+ * Set the prefetch high
+ * @param value The value
+ */
+ public void setPrefetchHigh(final Integer prefetchHigh)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setPrefetchHigh(" + prefetchHigh + ")");
+ }
+
+ this._prefetchHigh = prefetchHigh;
+ }
+
+ public int getSetupAttempts()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupAttempts()");
+ }
+
+ if (_setupAttempts == null)
+ {
+ return _ra.getSetupAttempts();
+ }
+ else
+ {
+ return _setupAttempts;
+ }
+ }
+
+ public void setSetupAttempts(int setupAttempts)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupAttempts(" + setupAttempts + ")");
+ }
+
+ this._setupAttempts = setupAttempts;
+ }
+
+ public long getSetupInterval()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("getSetupInterval()");
+ }
+
+ if (_setupInterval == null)
+ {
+ return _ra.getSetupInterval();
+ }
+ else
+ {
+ return _setupInterval;
+ }
+ }
+
+ public void setSetupInterval(long setupInterval)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setSetupInterval(" + setupInterval + ")");
+ }
+
+ this._setupInterval = setupInterval;
+ }
+
+ /**
+ * Validate
+ * @exception InvalidPropertyException Thrown if a validation exception occurs
+ */
+ public void validate() throws InvalidPropertyException
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("validate()");
+ }
+
+ if (_destination == null || _destination.trim().equals(""))
+ {
+ throw new InvalidPropertyException("Destination is mandatory");
+ }
+ }
+
+ /**
+ * Get a string representation
+ * @return The value
+ */
+ @Override
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(QpidActivationSpec.class.getName()).append('(');
+ buffer.append("ra=").append(_ra);
+ buffer.append(" destination=").append(_destination);
+ buffer.append(" destinationType=").append(_destinationType);
+ if (_messageSelector != null)
+ {
+ buffer.append(" selector=").append(_messageSelector);
+ }
+ buffer.append(" ack=").append(getAcknowledgeMode());
+ buffer.append(" durable=").append(_subscriptionDurability);
+ buffer.append(" clientID=").append(getClientId());
+ if (_subscriptionName != null)
+ {
+ buffer.append(" subscription=").append(_subscriptionName);
+ }
+ buffer.append(" user=").append(getUserName());
+ if (getPassword() != null)
+ {
+ buffer.append(" password=").append("****");
+ }
+ buffer.append(" maxSession=").append(_maxSession);
+ if (_prefetchLow != null)
+ {
+ buffer.append(" prefetchLow=").append(_prefetchLow);
+ }
+ if (_prefetchHigh != null)
+ {
+ buffer.append(" prefetchHigh=").append(_prefetchHigh);
+ }
+ buffer.append(')');
+ return buffer.toString();
+ }
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidMessageHandler.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra.inflow;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.qpid.ra.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The message handler
+ *
+ */
+public class QpidMessageHandler implements MessageListener
+{
+ /**
+ * The logger
+ */
+ private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
+
+ /**
+ * The session
+ */
+ private final Session _session;
+
+ private MessageConsumer _consumer;
+
+ /**
+ * The endpoint
+ */
+ private MessageEndpoint _endpoint;
+
+ private final QpidActivation _activation;
+
+ private boolean _useLocalTx;
+
+ private boolean _transacted;
+
+ private final TransactionManager _tm;
+
+ public QpidMessageHandler(final QpidActivation activation,
+ final TransactionManager tm,
+ final Session session)
+ {
+ this._activation = activation;
+ this._session = session;
+ this._tm = tm;
+ }
+
+ public void setup() throws Exception
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("setup()");
+ }
+
+ QpidActivationSpec spec = _activation.getActivationSpec();
+ String selector = spec.getMessageSelector();
+
+ // Create the message consumer
+ if (_activation.isTopic())
+ {
+ final Topic topic = (Topic) _activation.getDestination();
+ final String subscriptionName = spec.getSubscriptionName();
+ if (spec.isSubscriptionDurable())
+ _consumer = _session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ else
+ _consumer = _session.createConsumer(topic, selector) ;
+ }
+ else
+ {
+ final Queue queue = (Queue) _activation.getDestination();
+ _consumer = _session.createConsumer(queue, selector);
+ }
+
+ // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
+ MessageEndpointFactory endpointFactory = _activation.getMessageEndpointFactory();
+ _useLocalTx = _activation.getActivationSpec().isUseLocalTx();
+ _transacted = _activation.isDeliveryTransacted() || _useLocalTx ;
+ if (_activation.isDeliveryTransacted() && !_activation.getActivationSpec().isUseLocalTx())
+ {
+ final XAResource xaResource = ((XASession)_session).getXAResource() ;
+ _endpoint = endpointFactory.createEndpoint(xaResource);
+ }
+ else
+ {
+ _endpoint = endpointFactory.createEndpoint(null);
+ }
+ _consumer.setMessageListener(this);
+ }
+
+ /**
+ * Stop the handler
+ */
+ public void teardown()
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("teardown()");
+ }
+
+ try
+ {
+ if (_endpoint != null)
+ {
+ _endpoint.release();
+ _endpoint = null;
+ }
+ }
+ catch (Throwable t)
+ {
+ _log.debug("Error releasing endpoint " + _endpoint, t);
+ }
+ }
+
+ public void onMessage(final Message message)
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("onMessage(" + Util.asString(message) + ")");
+ }
+
+ boolean beforeDelivery = false;
+
+ try
+ {
+ if (_activation.getActivationSpec().getTransactionTimeout() > 0 && _tm != null)
+ {
+ _tm.setTransactionTimeout(_activation.getActivationSpec().getTransactionTimeout());
+ }
+
+ _endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
+ beforeDelivery = true;
+
+ if(_transacted)
+ {
+ message.acknowledge();
+ }
+
+ ((MessageListener)_endpoint).onMessage(message);
+
+ if (_transacted && (_tm.getTransaction() != null))
+ {
+ final int status = _tm.getStatus() ;
+ final boolean rollback = status == Status.STATUS_MARKED_ROLLBACK
+ || status == Status.STATUS_ROLLING_BACK
+ || status == Status.STATUS_ROLLEDBACK;
+ if (rollback)
+ {
+ _session.recover() ;
+ }
+ }
+ else
+ {
+ message.acknowledge();
+ }
+
+ try
+ {
+ _endpoint.afterDelivery();
+ }
+ catch (ResourceException e)
+ {
+ _log.warn("Unable to call after delivery", e);
+ return;
+ }
+ if (_useLocalTx)
+ {
+ _session.commit();
+ }
+ }
+ catch (Throwable e)
+ {
+ _log.error("Failed to deliver message", e);
+ // we need to call before/afterDelivery as a pair
+ if (beforeDelivery)
+ {
+ try
+ {
+ _endpoint.afterDelivery();
+ }
+ catch (ResourceException e1)
+ {
+ _log.warn("Unable to call after delivery", e);
+ }
+ }
+ if (_useLocalTx || !_activation.isDeliveryTransacted())
+ {
+ try
+ {
+ _session.rollback();
+ }
+ catch (JMSException e1)
+ {
+ _log.warn("Unable to roll local transaction back", e1);
+ }
+ }
+ else
+ {
+ try
+ {
+ _session.recover() ;
+ }
+ catch (JMSException e1)
+ {
+ _log.warn("Unable to recover XA transaction", e1);
+ }
+ }
+ }
+
+ }
+
+}
Added: qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java?rev=1220336&view=auto
==============================================================================
--- qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java (added)
+++ qpid/trunk/qpid/java/jca/src/main/java/org/apache/qpid/ra/tm/GeronimoTransactionManagerLocator.java Sun Dec 18 05:09:07 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.ra.tm;
+
+import java.util.Set;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.gbean.AbstractName;
+import org.apache.geronimo.gbean.AbstractNameQuery;
+import org.apache.geronimo.kernel.Kernel;
+import org.apache.geronimo.kernel.KernelRegistry;
+
+public class GeronimoTransactionManagerLocator
+{
+
+ public GeronimoTransactionManagerLocator()
+ {
+ }
+
+ public TransactionManager getTransactionManager()
+ {
+ try
+ {
+ Kernel kernel = KernelRegistry.getSingleKernel();
+ AbstractNameQuery query = new AbstractNameQuery(TransactionManager.class.getName ());
+ Set<AbstractName> names = kernel.listGBeans(query);
+
+ if (names.size() != 1)
+ {
+ throw new IllegalStateException("Expected one transaction manager, not " + names.size());
+ }
+
+ AbstractName name = names.iterator().next();
+ TransactionManager transMg = (TransactionManager) kernel.getGBean(name);
+ return (TransactionManager)transMg;
+
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org