You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/07 15:14:24 UTC

svn commit: r515576 - in /activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra: ActiveMQActivationSpec.java ActiveMQResourceAdapter.java

Author: jstrachan
Date: Wed Mar  7 06:14:23 2007
New Revision: 515576

URL: http://svn.apache.org/viewvc?view=rev&rev=515576
Log:
applied modified version of  AMQ-1147 (keeping the implementation classes the same name to avoid issues with existing RA configurations)

Added:
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java   (with props)
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java   (with props)

Added: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java?view=auto&rev=515576
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java (added)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java Wed Mar  7 06:14:23 2007
@@ -0,0 +1,722 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.beans.IntrospectionException;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.ResourceException;
+import javax.resource.spi.InvalidPropertyException;
+import javax.resource.spi.ResourceAdapter;
+
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.selector.SelectorParser;
+
+/**
+ * Configures the inbound JMS consumer specification using ActiveMQ
+ * 
+ * @org.apache.xbean.XBean element="activationSpec"
+ * 
+ * @version $Revision$ $Date$
+ */
+public class ActiveMQActivationSpec implements MessageActivationSpec, Serializable {
+
+    private static final long serialVersionUID = -7153087544100459975L;
+    
+    /** Auto-acknowledge constant for <code>acknowledgeMode</code> property **/
+    public static final String AUTO_ACKNOWLEDGE_MODE = "Auto-acknowledge";
+    /** Dups-ok-acknowledge constant for <code>acknowledgeMode</code> property * */
+    public static final String DUPS_OK_ACKNOWLEDGE_MODE = "Dups-ok-acknowledge";    
+    /** Durable constant for <code>subscriptionDurability</code> property * */
+    public static final String DURABLE_SUBSCRIPTION = "Durable";
+    /** NonDurable constant for <code>subscriptionDurability</code> property * */
+    public static final String NON_DURABLE_SUBSCRIPTION = "NonDurable";
+    
+    /**
+     * 
+     */
+    public static final int INVALID_ACKNOWLEDGE_MODE = -1;
+    
+    private transient MessageResourceAdapter resourceAdapter;
+    private String destinationType;
+    private String messageSelector;
+    private String destination;
+    private String acknowledgeMode = AUTO_ACKNOWLEDGE_MODE;
+    private String userName;
+    private String password;
+    private String clientId;
+    private String subscriptionName;
+    private String subscriptionDurability = NON_DURABLE_SUBSCRIPTION;    
+    private String noLocal = "false";
+    private String useRAManagedTransaction = "false";
+    private String maxSessions="10";
+    private String maxMessagesPerSessions="10";
+    private String enableBatch = "false";
+    private String maxMessagesPerBatch = "10";
+    private RedeliveryPolicy redeliveryPolicy;
+
+    
+    /**
+     * @see javax.resource.spi.ActivationSpec#validate()
+     */
+    public void validate() throws InvalidPropertyException {
+        List errorMessages = new ArrayList();
+        List propsNotSet = new ArrayList();
+        try {
+            if (!isValidDestination(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class));
+            if (!isValidDestinationType(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class));
+            if (!isValidAcknowledgeMode(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("acknowledgeMode", ActiveMQActivationSpec.class));
+            if (!isValidSubscriptionDurability(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("subscriptionDurability", ActiveMQActivationSpec.class));
+            if (!isValidClientId(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("clientId", ActiveMQActivationSpec.class));
+            if (!isValidSubscriptionName(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class));
+            if (!isValidMaxMessagesPerSessions(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("maxMessagesPerSessions", ActiveMQActivationSpec.class));
+            if (!isValidMaxSessions(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("maxSessions", ActiveMQActivationSpec.class));
+            if (!isValidMessageSelector(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("messageSelector", ActiveMQActivationSpec.class));
+            if (!isValidNoLocal(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("noLocal", ActiveMQActivationSpec.class));
+            if (!isValidUseRAManagedTransaction(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("useRAManagedTransaction", ActiveMQActivationSpec.class));
+            if (!isValidEnableBatch(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("enableBatch", ActiveMQActivationSpec.class));
+            if (!isValidMaxMessagesPerBatch(errorMessages))
+                propsNotSet.add(new PropertyDescriptor("maxMessagesPerBatch", ActiveMQActivationSpec.class));
+
+            
+        } catch (IntrospectionException e) {
+            e.printStackTrace();
+        }
+        
+        if (propsNotSet.size() > 0) {
+            StringBuffer b = new StringBuffer();
+            b.append("Invalid settings:");
+            for (Iterator iter = errorMessages.iterator(); iter.hasNext();) {
+                b.append(" ");
+                b.append(iter.next());
+            }
+            InvalidPropertyException e = new InvalidPropertyException(b.toString());
+            final PropertyDescriptor[] descriptors = (PropertyDescriptor[]) propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]);
+            e.setInvalidPropertyDescriptors(descriptors);
+            throw e;
+        }
+    }
+
+    private boolean isValidUseRAManagedTransaction(List errorMessages) {
+        try {
+            new Boolean(noLocal);
+            return true;
+        } catch (Throwable e) {
+        	//
+        }
+        errorMessages.add("noLocal must be set to: true or false.");
+        return false;
+    }
+
+    private boolean isValidNoLocal(List errorMessages) {
+        try {
+            new Boolean(noLocal);
+            return true;
+        } catch (Throwable e) {
+        	//
+        }
+        errorMessages.add("noLocal must be set to: true or false.");
+        return false;
+    }
+
+    private boolean isValidMessageSelector(List errorMessages) {
+        try {
+            if( !isEmpty(messageSelector) ) {
+                new SelectorParser().parse(messageSelector);
+            }
+            return true;
+        } catch (Throwable e) {
+            errorMessages.add("messageSelector not set to valid message selector: "+e.getMessage());
+            return false;
+        }
+    }
+
+    private boolean isValidMaxSessions(List errorMessages) {
+        try {
+            if( Integer.parseInt(maxSessions) > 0 ) {
+                return true;
+            }
+        } catch (NumberFormatException e) {
+        	//
+        }
+        errorMessages.add("maxSessions must be set to number > 0");
+        return false;
+    }
+
+    private boolean isValidMaxMessagesPerSessions(List errorMessages) {
+        try {
+            if( Integer.parseInt(maxMessagesPerSessions) > 0 ) {
+                return true;
+            }
+        } catch (NumberFormatException e) {
+        	//
+        }
+        errorMessages.add("maxMessagesPerSessions must be set to number > 0");
+        return false;
+    }
+
+    private boolean isValidMaxMessagesPerBatch(List errorMessages) {
+        try {
+            if( Integer.parseInt(maxMessagesPerBatch) > 0 ) {
+                return true;
+            }
+        } catch (NumberFormatException e) {
+        	//
+        }
+        errorMessages.add("maxMessagesPerBatch must be set to number > 0");
+        return false;
+    }
+
+    private boolean isValidEnableBatch(List errorMessages) {
+        try {
+            new Boolean(enableBatch);
+            return true;
+        } catch (Throwable e) {
+        	//
+        }
+        errorMessages.add("enableBatch must be set to: true or false");
+        return false;
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapterAssociation#getResourceAdapter()
+     */
+    public ResourceAdapter getResourceAdapter() {
+        return resourceAdapter;
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapterAssociation#setResourceAdapter(javax.resource.spi.ResourceAdapter)
+     */
+    public void setResourceAdapter(ResourceAdapter resourceAdapter) throws ResourceException {
+        //spec section 5.3.3
+        if (this.resourceAdapter != null) {
+            throw new ResourceException("ResourceAdapter already set");
+        }
+        if (!(resourceAdapter instanceof MessageResourceAdapter)) {
+            throw new ResourceException("ResourceAdapter is not of type: " + MessageResourceAdapter.class.getName());
+        }
+        this.resourceAdapter = (MessageResourceAdapter) resourceAdapter;
+    }
+
+
+    /////////////////////////////////////////////////////////////////////////
+    //
+    // Java Bean getters and setters for this ActivationSpec class.
+    //
+    /////////////////////////////////////////////////////////////////////////
+    /**
+     * @return Returns the destinationType.
+     */
+    public String getDestinationType() {
+        if (!isEmpty(destinationType)) {
+            return destinationType;
+        }
+        return null;
+    }
+
+    /**
+     * @param destinationType The destinationType to set.
+     */
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+    
+    /**
+     * 
+     */
+    public String getPassword() {
+        if (!isEmpty(password)) {                
+            return password;
+        }
+        return null;
+    }
+    
+    /**
+     * 
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * 
+     */
+    public String getUserName() {
+        if (!isEmpty(userName)) {        
+            return userName;
+        }
+        return null;
+    }
+    
+    /**
+     * 
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+    
+    /**
+     * @return Returns the messageSelector.
+     */
+    public String getMessageSelector() {
+        if (!isEmpty(messageSelector)) {
+            return messageSelector;
+        }
+        return null;
+    }
+
+    /**
+     * @param messageSelector The messageSelector to set.
+     */
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    /**
+     * @return Returns the noLocal.
+     */
+    public String getNoLocal() {
+        return noLocal;
+    }
+
+    /**
+     * @param noLocal The noLocal to set.
+     */
+    public void setNoLocal(String noLocal) {
+        if( noLocal!=null ) {
+            this.noLocal = noLocal;
+        }
+    }
+
+    /**
+     * 
+     */
+    public String getAcknowledgeMode() {
+        if (!isEmpty(acknowledgeMode)) {
+            return acknowledgeMode;
+        }
+        return null;
+    }
+
+    /**
+     * 
+     */
+    public void setAcknowledgeMode(String acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+    
+    /**
+     * 
+     */
+    public String getClientId() {
+        if (!isEmpty(clientId)) {
+            return clientId;
+        }
+        return null;
+    }
+
+    /**
+     * 
+     */
+    public void setClientId(String clientId) {        
+        this.clientId = clientId;            
+    }
+
+    /**
+     * 
+     */
+    public String getDestination() {
+        if (!isEmpty(destination)) {
+            return destination;
+        }
+        return null;
+    }
+
+    /**
+     * 
+     */
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * 
+     */
+    public String getSubscriptionDurability() {
+        if (!isEmpty(subscriptionDurability)) {                
+            return subscriptionDurability;
+        }
+        return null;
+    }
+
+    /**
+     * 
+     */
+    public void setSubscriptionDurability(String subscriptionDurability) {
+        this.subscriptionDurability = subscriptionDurability;
+    }
+
+    /**
+     * 
+     */
+    public String getSubscriptionName() {
+        if (!isEmpty(subscriptionName)) {
+            return subscriptionName;
+        }
+        return null;
+    }
+
+    /**
+     * 
+     */
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+    
+    private boolean isValidSubscriptionName(List errorMessages) {
+        if( !isDurableSubscription() ? true : subscriptionName != null && subscriptionName.trim().length() > 0 ) {
+            return true;
+        } 
+        errorMessages.add("subscriptionName must be set since durable subscription was requested.");
+        return false;
+    }
+
+    private boolean isValidClientId(List errorMessages) {
+        if( !isDurableSubscription() ? true : clientId != null && clientId.trim().length() > 0 ) {
+            return true;
+        }
+        errorMessages.add("clientId must be set since durable subscription was requested.");
+        return false;
+    }
+
+    /**
+     * 
+     */
+    public boolean isDurableSubscription() {
+        return DURABLE_SUBSCRIPTION.equals(subscriptionDurability);
+    }
+
+    private boolean isValidSubscriptionDurability(List errorMessages) {
+        // subscriptionDurability only applies to Topics
+        if ( DURABLE_SUBSCRIPTION.equals(subscriptionDurability) &&
+             getDestinationType() != null && !Topic.class.getName().equals(getDestinationType())) {
+            errorMessages.add("subscriptionDurability cannot be set to: "+DURABLE_SUBSCRIPTION+" when destinationType is set to "+
+                Queue.class.getName()+" as it is only valid when destinationType is set to "+Topic.class.getName()+".");
+            return false;
+        }
+        if (NON_DURABLE_SUBSCRIPTION.equals(subscriptionDurability) || DURABLE_SUBSCRIPTION.equals(subscriptionDurability))
+            return true;
+        errorMessages.add("subscriptionDurability must be set to: "+NON_DURABLE_SUBSCRIPTION+" or "+DURABLE_SUBSCRIPTION+".");
+        return false;
+    }
+
+    private boolean isValidAcknowledgeMode(List errorMessages) {
+        if (AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) || DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode))
+            return true;
+        errorMessages.add("acknowledgeMode must be set to: "+AUTO_ACKNOWLEDGE_MODE+" or "+DUPS_OK_ACKNOWLEDGE_MODE+".");
+        return false;
+    }
+
+    private boolean isValidDestinationType(List errorMessages) {
+        if (Queue.class.getName().equals(destinationType) || Topic.class.getName().equals(destinationType))
+            return true;
+        errorMessages.add("destinationType must be set to: "+Queue.class.getName()+" or "+Topic.class.getName()+".");
+        return false;
+    }
+
+    private boolean isValidDestination(List errorMessages) {
+        if(!(destination == null || destination.equals(""))) 
+            return true;
+        errorMessages.add("destination is a required field and must be set to the destination name.");
+        return false;
+    }
+     
+    private boolean isEmpty(String value) {
+        return value == null || "".equals(value.trim());
+    }
+
+    /**
+     * 
+     */
+   @Override
+public String toString() {
+        return "ActiveMQActivationSpec{" +
+                "acknowledgeMode='" + acknowledgeMode + "'" +
+                ", destinationType='" + destinationType + "'" +
+                ", messageSelector='" + messageSelector + "'" +
+                ", destination='" + destination + "'" +
+                ", clientId='" + clientId + "'" +
+                ", subscriptionName='" + subscriptionName + "'" +
+                ", subscriptionDurability='" + subscriptionDurability + "'" +
+                "}";
+    }
+
+   public int getAcknowledgeModeForSession() {
+        if( AUTO_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
+            return Session.AUTO_ACKNOWLEDGE;
+        } else if( DUPS_OK_ACKNOWLEDGE_MODE.equals(acknowledgeMode) ) {
+            return Session.DUPS_OK_ACKNOWLEDGE;
+        } else {
+            return INVALID_ACKNOWLEDGE_MODE;
+        }
+    }
+    
+    /**
+     * A helper method mostly for use in Dependency Injection containers
+     * which allows you to customize the destination and destinationType properties
+     * from a single ActiveMQDestination POJO
+     */
+    public void setActiveMQDestination(ActiveMQDestination destination) {
+        setDestination(destination.getPhysicalName());
+        if (destination instanceof Queue) {
+            setDestinationType(Queue.class.getName());
+        }
+        else {
+            setDestinationType(Topic.class.getName());
+        }
+    }
+
+    /**
+     * 
+     */
+    public ActiveMQDestination createDestination() {
+        if( isEmpty(destinationType) || isEmpty(destination) )
+            return null;
+        
+        ActiveMQDestination dest = null;
+        if (Queue.class.getName().equals(destinationType)) {
+            dest = new ActiveMQQueue(destination);
+        } else if (Topic.class.getName().equals(destinationType)) {
+            dest = new ActiveMQTopic(destination);
+        } else {
+            assert false : "Execution should never reach here";
+        }
+        return dest;
+    }
+
+    public String getMaxMessagesPerSessions() {
+        return maxMessagesPerSessions.toString();
+    }
+    
+    /**
+     * 
+     */
+    public void setMaxMessagesPerSessions(String maxMessagesPerSessions) {
+        if( maxMessagesPerSessions!=null ) {
+            this.maxMessagesPerSessions = maxMessagesPerSessions;
+        }
+    }    
+
+    /**
+     * 
+     */
+    public String getMaxSessions() {
+        return maxSessions;
+    }
+
+    /**
+     * 
+     */
+    public void setMaxSessions(String maxSessions) {
+        if( maxSessions!=null ) {
+            this.maxSessions = maxSessions;
+        }
+    }
+    
+    /**
+     * 
+     */
+    public String getUseRAManagedTransaction() {
+        return useRAManagedTransaction;
+    }
+    
+    /**
+     * 
+     */
+    public void setUseRAManagedTransaction(String useRAManagedTransaction) {
+        if( useRAManagedTransaction!=null ) {
+            this.useRAManagedTransaction = useRAManagedTransaction;
+        }
+    }
+
+    /**
+     * 
+     */
+    public int getMaxMessagesPerSessionsIntValue() {
+        return Integer.parseInt(maxMessagesPerSessions);
+    }
+
+    /**
+     * 
+     */
+    public int getMaxSessionsIntValue() {
+        return Integer.parseInt(maxSessions);
+    }
+
+    public boolean isUseRAManagedTransactionEnabled() {
+        return new Boolean(useRAManagedTransaction).booleanValue();
+    }
+
+    /**
+     * 
+     */
+    public boolean getNoLocalBooleanValue() {
+        return new Boolean(noLocal).booleanValue();
+    }
+
+    public String getEnableBatch() {
+        return enableBatch;
+    }
+
+    /**
+     * 
+     */
+    public void setEnableBatch(String enableBatch) {
+        if (enableBatch != null) {
+            this.enableBatch = enableBatch;
+        }
+    }
+
+    public boolean getEnableBatchBooleanValue() {
+        return new Boolean(enableBatch).booleanValue();
+    }
+
+    public int getMaxMessagesPerBatchIntValue() {
+        return Integer.parseInt(maxMessagesPerBatch);
+    }
+
+    public String getMaxMessagesPerBatch() {
+        return maxMessagesPerBatch.toString();
+    }
+
+    /**
+     * 
+     */
+    public void setMaxMessagesPerBatch(String maxMessagesPerBatch) {
+        if (maxMessagesPerBatch != null) {
+            this.maxMessagesPerBatch = maxMessagesPerBatch;
+        }
+    }
+
+    /**
+     * 
+     */
+    public short getBackOffMultiplier() {
+        if (redeliveryPolicy == null) {
+            return 0;
+        }
+        return redeliveryPolicy.getBackOffMultiplier();
+    }
+
+    /**
+     * 
+     */
+    public long getInitialRedeliveryDelay() {
+        if (redeliveryPolicy == null) {
+            return 0;
+        }
+        return redeliveryPolicy.getInitialRedeliveryDelay();
+    }
+
+    /**
+     * 
+     */
+    public int getMaximumRedeliveries() {
+        if (redeliveryPolicy == null) {
+            return 0;
+        }
+        return redeliveryPolicy.getMaximumRedeliveries();
+    }
+
+    /**
+     * 
+     */
+    public boolean isUseExponentialBackOff() {
+        if (redeliveryPolicy == null) {
+            return false;
+        }
+        return redeliveryPolicy.isUseExponentialBackOff();
+    }
+
+    /**
+     * 
+     */
+    public void setBackOffMultiplier(short backOffMultiplier) {
+        lazyCreateRedeliveryPolicy().setBackOffMultiplier(backOffMultiplier);
+    }
+
+    /**
+     * 
+     */
+    public void setInitialRedeliveryDelay(long initialRedeliveryDelay) {
+        lazyCreateRedeliveryPolicy().setInitialRedeliveryDelay(initialRedeliveryDelay);
+    }
+
+    /**
+     * 
+     */
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        lazyCreateRedeliveryPolicy().setMaximumRedeliveries(maximumRedeliveries);
+    }
+
+    /**
+     * 
+     */
+    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+        lazyCreateRedeliveryPolicy().setUseExponentialBackOff(useExponentialBackOff);
+    }
+
+    // don't use getter to avoid causing introspection errors in containers
+    /**
+     * 
+     */
+    public RedeliveryPolicy redeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+    
+    protected RedeliveryPolicy lazyCreateRedeliveryPolicy() {
+        if (redeliveryPolicy == null) {
+            redeliveryPolicy = new RedeliveryPolicy();
+        }
+        return redeliveryPolicy;
+    }
+}
+

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQActivationSpec.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java?view=auto&rev=515576
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java (added)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java Wed Mar  7 06:14:23 2007
@@ -0,0 +1,593 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.resource.NotSupportedException;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Knows how to connect to one ActiveMQ server. It can then activate endpoints
+ * and deliver messages to those end points using the connection configure in the
+ * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
+ *
+ * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
+ * description="The JCA Resource Adaptor for ActiveMQ"
+ *
+ * @version $Revision$
+ */
+public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable {
+
+    private static final long serialVersionUID = -5417363537865649130L;
+    private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
+    
+    private final HashMap endpointWorkers = new HashMap();
+    private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
+
+    private BootstrapContext bootstrapContext;
+    private String brokerXmlConfig;
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+
+    /**
+     * 
+     */
+    public ActiveMQResourceAdapter() {
+    	super();
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
+     */
+    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
+        this.bootstrapContext = bootstrapContext;
+        if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) {
+            try {
+                broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
+                broker.start();
+            } catch (Throwable e) {
+                throw new ResourceAdapterInternalException("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e);
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
+     */
+    public ActiveMQConnection makeConnection() throws JMSException {
+        if (connectionFactory != null) {
+            return makeConnection(info, connectionFactory);
+        }
+        return makeConnection(info);
+    }
+
+    /**
+     */
+    public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException {
+
+        ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
+        return makeConnection(info, connectionFactory);
+    }
+
+    /**
+     * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo, org.apache.activemq.ActiveMQConnectionFactory)
+     */
+    public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException {
+        String userName = info.getUserName();
+        String password = info.getPassword();
+        ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
+
+        String clientId = info.getClientid();
+        if (clientId != null && clientId.length() > 0) {
+            physicalConnection.setClientID(clientId);
+        }
+        return physicalConnection;
+    }
+
+    /**
+     * @param activationSpec
+     */
+    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
+        ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
+        String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
+        String password = defaultValue(activationSpec.getPassword(), info.getPassword());
+        String clientId = activationSpec.getClientId();
+        if (clientId != null) {
+            connectionFactory.setClientID(clientId);
+        }
+        else {
+            if (activationSpec.isDurableSubscription()) {
+                log.warn("No clientID specified for durable subscription: " + activationSpec);
+            }
+        }
+        ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
+
+        // have we configured a redelivery policy
+        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
+        if (redeliveryPolicy != null) {
+            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
+        }
+        return physicalConnection;
+    }
+
+    /**
+     * @param info
+     * @throws JMSException
+     * @throws URISyntaxException
+     */
+    synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException {
+        ActiveMQConnectionFactory factory = connectionFactory;
+        if (factory != null && info.isConnectionFactoryConfigured()) {
+            factory = factory.copy();
+        }
+        else if (factory == null) {
+            factory = new ActiveMQConnectionFactory();
+        }
+        info.configure(factory);
+        return factory;
+    }
+
+    private String defaultValue(String value, String defaultValue) {
+        if (value != null)
+            return value;
+        return defaultValue;
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapter#stop()
+     */
+    public void stop() {
+        while (endpointWorkers.size() > 0) {
+            ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
+            endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
+        }
+        if (broker != null) {
+            ServiceSupport.dispose(broker);
+            broker = null;
+        }
+        this.bootstrapContext = null;
+    }
+
+    /**
+     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
+     */
+    public BootstrapContext getBootstrapContext() {
+        return bootstrapContext;
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
+     *      javax.resource.spi.ActivationSpec)
+     */
+    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
+            throws ResourceException {
+
+        // spec section 5.3.3
+        if (!equals(activationSpec.getResourceAdapter())) {
+            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
+        }
+
+        if (!(activationSpec instanceof MessageActivationSpec)) {
+            throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
+        }
+
+        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
+                (MessageActivationSpec) activationSpec);
+        // This is weird.. the same endpoint activated twice.. must be a
+        // container error.
+        if (endpointWorkers.containsKey(key)) {
+            throw new IllegalStateException("Endpoint previously activated");
+        }
+
+        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
+
+        endpointWorkers.put(key, worker);
+        worker.start();
+    }
+
+    /**
+     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
+     *      javax.resource.spi.ActivationSpec)
+     */
+    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
+
+        if (activationSpec instanceof MessageActivationSpec) {
+            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec) activationSpec);
+            ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key);
+            if (worker == null) {
+                // This is weird.. that endpoint was not activated.. oh well..
+                // this method
+                // does not throw exceptions so just return.
+                return;
+            }
+            try {
+                worker.stop();
+            } catch (InterruptedException e) {
+                // We interrupted.. we won't throw an exception but will stop
+                // waiting for the worker
+                // to stop.. we tried our best. Keep trying to interrupt the
+                // thread.
+                Thread.currentThread().interrupt();
+            }
+
+        }
+
+    }
+
+    /**
+     * We only connect to one resource manager per ResourceAdapter instance, so
+     * any ActivationSpec will return the same XAResource.
+     *
+     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
+     */
+    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
+        Connection connection = null;
+        try {
+            connection = makeConnection();
+            if (connection instanceof XAConnection) {
+                XASession session = ((XAConnection) connection).createXASession();
+                XAResource xaResource = session.getXAResource();
+                return new XAResource[] { xaResource };
+            }
+            return new XAResource[] {};
+        } catch (JMSException e) {
+            throw new ResourceException(e);
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            	//
+            }
+        }
+    }
+
+    // ///////////////////////////////////////////////////////////////////////
+    //
+    // Java Bean getters and setters for this ResourceAdapter class.
+    //
+    // ///////////////////////////////////////////////////////////////////////
+
+    /**
+     * @return client id
+     */
+    public String getClientid() {
+        return emptyToNull(info.getClientid());
+    }
+
+    /**
+     * @return password
+     */
+    public String getPassword() {
+        return emptyToNull(info.getPassword());
+    }
+
+    /**
+     * @return server URL
+     */
+    public String getServerUrl() {
+        return info.getServerUrl();
+    }
+
+    /**
+     * @return user name
+     */
+    public String getUserName() {
+        return emptyToNull(info.getUserName());
+    }
+
+    /**
+     * @param clientid
+     */
+    public void setClientid(String clientid) {
+        info.setClientid(clientid);
+    }
+
+    /**
+     * @param password
+     */
+    public void setPassword(String password) {
+        info.setPassword(password);
+    }
+
+    /**
+     * @param url
+     */
+    public void setServerUrl(String url) {
+        info.setServerUrl(url);
+    }
+
+    /**
+     * @param userid
+     */
+    public void setUserName(String userid) {
+        info.setUserName(userid);
+    }
+
+    /**
+     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
+     */
+    public String getBrokerXmlConfig() {
+        return brokerXmlConfig;
+    }
+
+    /**
+     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
+     * configuration file </a> used to configure the ActiveMQ broker via Spring
+     * if using embedded mode.
+     *
+     * @param brokerXmlConfig
+     *            is the filename which is assumed to be on the classpath unless
+     *            a URL is specified. So a value of <code>foo/bar.xml</code>
+     *            would be assumed to be on the classpath whereas
+     *            <code>file:dir/file.xml</code> would use the file system.
+     *            Any valid URL string is supported.
+     */
+    public void setBrokerXmlConfig(String brokerXmlConfig) {
+        this.brokerXmlConfig=brokerXmlConfig;
+    }
+
+    /**
+     * @return durable topic prefetch
+     */
+    public Integer getDurableTopicPrefetch() {
+        return info.getDurableTopicPrefetch();
+    }
+
+    /**
+     * @return initial redelivery delay
+     */
+    public Long getInitialRedeliveryDelay() {
+        return info.getInitialRedeliveryDelay();
+    }
+
+    /**
+     * @return input stream prefetch
+     */
+    public Integer getInputStreamPrefetch() {
+        return info.getInputStreamPrefetch();
+    }
+
+    /**
+     * @return maximum redeliveries
+     */
+    public Integer getMaximumRedeliveries() {
+        return info.getMaximumRedeliveries();
+    }
+
+    /**
+     * @return queue browser prefetch
+     */
+    public Integer getQueueBrowserPrefetch() {
+        return info.getQueueBrowserPrefetch();
+    }
+
+    /**
+     * @return queue prefetch
+     */
+    public Integer getQueuePrefetch() {
+        return info.getQueuePrefetch();
+    }
+
+    /**
+     * @return redelivery backoff multiplier
+     */
+    public Short getRedeliveryBackOffMultiplier() {
+        return info.getRedeliveryBackOffMultiplier();
+    }
+
+    /**
+     * @return redelivery use exponential backoff
+     */
+    public Boolean getRedeliveryUseExponentialBackOff() {
+        return info.getRedeliveryUseExponentialBackOff();
+    }
+
+    /**
+     * @return topic prefetch
+     */
+    public Integer getTopicPrefetch() {
+        return info.getTopicPrefetch();
+    }
+
+    /**
+     * @return use inbound session enabled
+     */
+    public boolean isUseInboundSessionEnabled() {
+        return info.isUseInboundSessionEnabled();
+    }
+
+    /**
+     * @param i
+     */
+    public void setAllPrefetchValues(Integer i) {
+        info.setAllPrefetchValues(i);
+    }
+
+    /**
+     * @param durableTopicPrefetch
+     */
+    public void setDurableTopicPrefetch(Integer durableTopicPrefetch) {
+        info.setDurableTopicPrefetch(durableTopicPrefetch);
+    }
+
+    /**
+     * @param value
+     */
+    public void setInitialRedeliveryDelay(Long value) {
+        info.setInitialRedeliveryDelay(value);
+    }
+
+    /**
+     * @param inputStreamPrefetch
+     */
+    public void setInputStreamPrefetch(Integer inputStreamPrefetch) {
+        info.setInputStreamPrefetch(inputStreamPrefetch);
+    }
+
+    /**
+     * @param value
+     */
+    public void setMaximumRedeliveries(Integer value) {
+        info.setMaximumRedeliveries(value);
+    }
+
+    /**
+     * @param queueBrowserPrefetch
+     */
+    public void setQueueBrowserPrefetch(Integer queueBrowserPrefetch) {
+        info.setQueueBrowserPrefetch(queueBrowserPrefetch);
+    }
+
+    /**
+     * @param queuePrefetch
+     */
+    public void setQueuePrefetch(Integer queuePrefetch) {
+        info.setQueuePrefetch(queuePrefetch);
+    }
+
+    /**
+     * @param value
+     */
+    public void setRedeliveryBackOffMultiplier(Short value) {
+        info.setRedeliveryBackOffMultiplier(value);
+    }
+
+    /**
+     * @param value
+     */
+    public void setRedeliveryUseExponentialBackOff(Boolean value) {
+        info.setRedeliveryUseExponentialBackOff(value);
+    }
+
+    /**
+     * @param topicPrefetch
+     */
+    public void setTopicPrefetch(Integer topicPrefetch) {
+        info.setTopicPrefetch(topicPrefetch);
+    }
+
+    /**
+     * @return Returns the info.
+     */
+    public ActiveMQConnectionRequestInfo getInfo() {
+        return info;
+    }
+
+    /**
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof MessageResourceAdapter)) {
+            return false;
+        }
+
+        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter) o;
+
+        if (!info.equals(activeMQResourceAdapter.getInfo())) {
+            return false;
+        }
+        if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig()) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean notEqual(Object o1, Object o2) {
+        return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
+    }
+
+
+    /**
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        int result;
+        result = info.hashCode();
+        if( brokerXmlConfig !=null ) {
+            result ^= brokerXmlConfig.hashCode();
+        }
+        return result;
+    }
+
+    private String emptyToNull(String value) {
+        if (value == null || value.length() == 0) {
+            return null;
+        }
+        return value;
+    }
+
+    /**
+     * @return use inbound session
+     */
+    public Boolean getUseInboundSession() {
+        return info.getUseInboundSession();
+    }
+
+    /**
+     * @param useInboundSession
+     */
+    public void setUseInboundSession(Boolean useInboundSession) {
+        info.setUseInboundSession(useInboundSession);
+    }
+
+    /**
+     * @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory()
+     */
+    public ActiveMQConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    /**
+     * This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging.
+     * Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery
+     * and prefetch policies; the properties on the connectionFactory will be used instead.
+     */
+    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+
+}

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain