You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beehive.apache.org by ek...@apache.org on 2005/05/18 22:24:51 UTC
svn commit: r170816 -
/incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs
Author: ekoneil
Date: Wed May 18 13:24:51 2005
New Revision: 170816
URL: http://svn.apache.org/viewcvs?rev=170816&view=rev
Log:
JMS control bug fix for JIRA 747.
Also fix two lurking NPEs when adding properties or headers and providing an empty Map.
Javadoc cleanup as well.
BB: self
DRT: Beehive pass
Modified:
incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs
Modified: incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs
URL: http://svn.apache.org/viewcvs/incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs?rev=170816&r1=170815&r2=170816&view=diff
==============================================================================
--- incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs (original)
+++ incubator/beehive/trunk/system-controls/src/jms/org/apache/beehive/controls/system/jms/impl/JMSControlImpl.jcs Wed May 18 13:24:51 2005
@@ -1,1095 +1,1101 @@
-/*
- * Copyright 2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * $Header:$
- */
-package org.apache.beehive.controls.system.jms.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.ConnectionFactory;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.BytesMessage;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-
-import org.apache.beehive.controls.api.ControlException;
-import org.apache.beehive.controls.api.events.EventHandler;
-import org.apache.beehive.controls.api.context.ControlBeanContext;
-import org.apache.beehive.controls.api.context.Context;
-import org.apache.beehive.controls.api.context.ResourceContext;
-import org.apache.beehive.controls.api.bean.ControlImplementation;
-import org.apache.beehive.controls.api.bean.Extensible;
-import org.apache.beehive.controls.api.bean.Control;
-import org.apache.beehive.controls.system.jms.JMSControl;
-import org.apache.beehive.controls.system.jndi.JndiControlBean;
-import org.apache.xmlbeans.XmlObject;
-
-/**
- * Implementation of the jms control.
- */
-@ControlImplementation
-public class JMSControlImpl
- implements JMSControl, Extensible, java.io.Serializable {
-
- /**
- * @see JMSControl.getSession()
- */
- public Session getSession() throws ControlException
- {
- if (_session == null)
- {
- try
- {
- switch(getDestinationType())
- {
- case Auto:
- determineDestination();
- return getSession();
- case Topic:
- createTopicSession();
- break;
- case Queue:
- createQueueSession();
- break;
- }
-
- }
- catch(JMSException e)
- {
- throw new ControlException("Failure to get JMS connection or session",e);
- }
- }
- return _session;
- }
-
- /**
- * Get the jms-destination.
- *
- * @see org.apache.beehive.controls.system.jms.JMSControl#getDestination()
- */
- public javax.jms.Destination getDestination() throws ControlException
- {
- if (_destination == null)
- {
- _destination = (javax.jms.Destination) getJndiControl().getResource(
- getDestinationProperties().sendJndiName(), javax.jms.Destination.class);
-
- }
- return _destination;
- }
-
- /**
- * @see JMSControl#getConnection
- */
- public javax.jms.Connection getConnection() throws ControlException
- {
- getSession();
- return _connection;
- }
-
- /**
- * @see JMSControl#setHeaders
- */
- public void setHeaders(Map headers)
- {
- Map map = new HashMap();
- Iterator i = headers.keySet().iterator();
- while (i.hasNext())
- {
- Object name = i.next();
- Object value = headers.get(name);
- HeaderType type = null;
- /*
- * Allow for string valued or HeaderType valued
- * map entries.
- */
- if(name instanceof HeaderType)
- {
- type = (HeaderType)type;
- }
- else
- {
- if (name.equals(HeaderType.JMSCorrelationID.toString()))
- {
- type = HeaderType.JMSCorrelationID;
- }
- else if (name.equals(HeaderType.JMSDeliveryMode.toString()))
- {
- type = HeaderType.JMSDeliveryMode;
- }
- else if (name.equals(HeaderType.JMSExpiration.toString()))
- {
- type = HeaderType.JMSExpiration;
- }
- else if (name.equals(HeaderType.JMSMessageID.toString()))
- {
- type = HeaderType.JMSMessageID;
- }
- else if (name.equals(HeaderType.JMSPriority.toString()))
- {
- type = HeaderType.JMSPriority;
- }
- else if (name.equals(HeaderType.JMSRedelivered.toString()))
- {
- type = HeaderType.JMSRedelivered;
- }
- else if (name.equals(HeaderType.JMSTimestamp.toString()))
- {
- type = HeaderType.JMSTimestamp;
- }
- else if (name.equals(HeaderType.JMSType.toString()))
- {
- type = HeaderType.JMSType;
- }
- else
- {
- throw new IllegalArgumentException("Invalid JMS header type '"+name+"'");
- }
- }
- map.put(type,value);
- }
- Iterator iter = map.keySet().iterator();
- while(iter.hasNext())
- {
- Object key = iter.next();
- headers.put(key,map.get(key));
- }
- }
-
- /**
- * @see JMSControl#setHeader
- */
- public void setHeader(JMSControl.HeaderType type,Object value)
- {
- if(_headers == null)
- {
- _headers = new HashMap<HeaderType,Object>();
- }
- _headers.put(type,value);
- }
-
-
- /**
- * @see JMSControl#setProperties
- */
- public void setProperties(Map properties)
- {
- Iterator<String> i = properties.keySet().iterator();
- while (i.hasNext())
- {
- String name = i.next();
- Object value = properties.get(name);
- setProperty(name,value);
- }
- }
- /**
- * @see JMSControl#setProperty
- */
- public void setProperty(String name,Object value)
- {
- if(_properties == null)
- {
- _properties = new HashMap<String,Object>();
- }
- _properties.put(name,value);
- }
-
-
- /**
- * @see com.bea.control.Extensible#invoke}.
- */
- public Object invoke(Method method, Object[] args) throws ControlException
- {
- assert (method != null && args != null);
- javax.jms.Message m = null;
- boolean isBody = false;
- try
- {
- Destination props = getDestinationProperties();
- Session session = getSession();
- String jmsType = null;
- String correlationId = null;
- /*
- * Set the deliveryMode, priority and expiration.
- */
- int deliveryMode = getProducer().getDeliveryMode();
- int priority = getProducer().getPriority();
- long expiration = getProducer().getTimeToLive();
- Object mode = getHeader(HeaderType.JMSDeliveryMode);
- if(mode != null)
- {
- deliveryMode = deliveryModeToJmsMode(mode);
- }
- Integer v = getHeaderAsInteger(HeaderType.JMSPriority);
- if(v != null)
- {
- priority = v.intValue();
- }
- Long l = getHeaderAsLong(HeaderType.JMSExpiration);
- if(l != null)
- {
- expiration = l.longValue();
- }
- /*
- * Get the body of the message.
- */
- Object body = null;
- // Check to see if any parameter has annotation. If it doesn't then don't bother
- // checking them.
- boolean hasAnnotation = method.getParameterAnnotations().length > 0;
- for (int i= 0; i< args.length; i++)
- {
- if(hasAnnotation)
- {
- if (_context.getParameterPropertySet(method, i, Priority.class) != null)
- {
- continue;
- }
- if (_context.getParameterPropertySet(method, i, Property.class) != null)
- {
- continue;
- }
- if (_context.getParameterPropertySet(method, i, Expiration.class) != null)
- {
- continue;
- }
- if (_context.getParameterPropertySet(method, i, Delivery.class) != null)
- {
- continue;
- }
- if (_context.getParameterPropertySet(method, i, Type.class) != null)
- {
- continue;
- }
- if (_context.getParameterPropertySet(method, i, CorrelationId.class) != null)
- {
- continue;
- }
- }
- if(isBody)
- {
- throw new IllegalArgumentException("At most one parameter may be defined as the body of the JMS message");
- }
- body = args[i];
- isBody = true;
- }
- /*
- * Get the method level annotation properties.
- */
- Priority pri = _context.getMethodPropertySet(method,Priority.class);
- if(pri != null && pri.value() != -1)
- {
- priority = pri.value();
- }
- Expiration exp = _context.getMethodPropertySet(method,Expiration.class);
- if(exp != null && exp.value() != -1L)
- {
- expiration = exp.value();
- }
- Delivery del = _context.getMethodPropertySet(method,Delivery.class);
- if(del != null && del.value() != DeliveryMode.Auto)
- {
- deliveryMode = deliveryModeToJmsMode(del.value());
- }
- Type t = _context.getMethodPropertySet(method,Type.class);
- if(t != null && t.value().length() != 0)
- {
- jmsType = t.value();
- }
- CorrelationId id = _context.getMethodPropertySet(method,CorrelationId.class);
- if(id != null && id.value().length() != 0)
- {
- correlationId = id.value();
- }
- /*
- * Create a message of the appropriate type and set the body.
- */
- JMSControl.Message mess = _context.getMethodPropertySet(method,JMSControl.Message.class);
- MessageType type = MessageType.Auto;
- if(mess != null)
- {
- type = mess.value();
- }
- if(type.equals(MessageType.Auto))
- {
- if(body instanceof byte[])
- {
- type = MessageType.Bytes;
- }
- else if(body instanceof Map || !isBody)
- {
- type = MessageType.Map;
- }
- else if(body instanceof String || body instanceof XmlObject)
- {
- type = MessageType.Text;
- }
- else if(body instanceof javax.jms.Message)
- {
- type = MessageType.JMSMessage;
- }
- else if(body instanceof Serializable)
- {
- type = MessageType.Object;
- }
- else
- {
- throw new ControlException("Cannot determine message type from body");
- }
- }
- switch(type)
- {
- case Object:
- checkBody(body,Serializable.class);
- m = session.createObjectMessage((Serializable)body);
- break;
- case Bytes:
- BytesMessage sm;
- checkBody(body,byte[].class);
- m = sm = session.createBytesMessage();
- sm.writeBytes((byte[])body);
- break;
- case Text:
- if (body instanceof XmlObject)
- {
- body = (((XmlObject)body)).xmlText();
- }
- checkBody(body,String.class);
- m = session.createTextMessage((String)body);
- break;
- case Map:
- MapMessage mm;
- checkBody(body,Map.class);
- m = mm = session.createMapMessage();
- Map map =(Map)body;
- Iterator iter = map.keySet().iterator();
- while(iter.hasNext())
- {
- String key = (String)iter.next();
- mm.setObject(key,map.get(key));
- }
- break;
- case JMSMessage:
- checkBody(body,javax.jms.Message.class);
- m = (javax.jms.Message)body;
- break;
- }
- /*
- * Set the correlation id if given.
- */
- String correlationProp = props.sendCorrelationProperty();
- if(correlationProp != null && correlationProp.length() == 0)
- {
- correlationProp = null;
- }
- if(correlationId == null)
- {
- correlationId = getHeaderAsString(HeaderType.JMSCorrelationID);
- }
- Properties jmsProps =
- (Properties)_context.getMethodPropertySet(method,Properties.class);
- if(jmsProps != null && jmsProps.value() != null)
- {
- PropertyValue[] jprops = jmsProps.value();
- for(int i = 0; i < jprops.length; i++)
- {
- PropertyValue jprop = jprops[i];
- Class cls = jprop.type();
- if(cls.equals(String.class))
- {
- m.setStringProperty(jprop.name(),jprop.value());
- }
- else if(cls.equals(Integer.class))
- {
- m.setIntProperty(jprop.name(),valueAsInteger(jprop.value()));
- }
- else if(cls.equals(Long.class))
- {
- m.setLongProperty(jprop.name(),valueAsLong(jprop.value()));
- }
- else
- {
- throw new IllegalArgumentException("Invalid type for property-value");
- }
- }
- }
- /*
- * Set the properties/headers of the message from
- * the parameters to the invoke.
- */
- if(hasAnnotation)
- {
- for (int i= 0; i< args.length; i++)
- {
- Property jmsProp =
- (Property)_context.getParameterPropertySet(method,i, Property.class);
- if (jmsProp != null)
- {
- m.setObjectProperty(jmsProp.name(), args[i]);
- }
- Priority jmsPriority =
- (Priority)_context.getParameterPropertySet(method,i, Priority.class);
- if (jmsPriority != null)
- {
- Integer p = valueAsInteger(args[i]);
- if(p != null)
- {
- priority = p.intValue();
- }
- }
- Expiration jmsExpiration =
- (Expiration)_context.getParameterPropertySet(method,i, Expiration.class);
- if (jmsExpiration != null)
- {
- Long e = valueAsLong(args[i]);
- if(e != null)
- {
- expiration = e.longValue();
- }
- }
- Delivery jmsDelivery =
- (Delivery)_context.getParameterPropertySet(method,i, Delivery.class);
- if (jmsDelivery != null && args[i] != null)
- {
- deliveryMode = deliveryModeToJmsMode(args[i]);
- }
- t =
- (Type)_context.getParameterPropertySet(method,i, Type.class);
- if (t != null && args[i] != null)
- {
- jmsType = (String)args[i];
- }
- CorrelationId jmsId =
- (CorrelationId)_context.getParameterPropertySet(method,i, CorrelationId.class);
- if (jmsId != null && args[i] != null)
- {
- correlationId = (String)args[i];
- }
- }
- }
- if(correlationProp != null)
- {
- m.setStringProperty(correlationProp,correlationId);
- }
- else
- {
- m.setJMSCorrelationID(correlationId);
- }
- /*
- * Set the headers and properties from maps provided
- * by setProperties() and setHeaders()
- */
- m.setJMSExpiration(expiration);
- m.setJMSDeliveryMode(deliveryMode);
- m.setJMSPriority(priority);
- setMessageHeaders(m);
- setMessageProperties(m);
- expiration = m.getJMSExpiration();
- deliveryMode = m.getJMSDeliveryMode();
- priority = m.getJMSPriority();
- _headers = null;
- _properties = null;
- /*
- * Send the message.
- */
- switch(getDestinationType())
- {
- case Topic:
- ((TopicPublisher)getProducer()).publish(m,deliveryMode,priority,expiration);
- break;
- case Queue:
- ((QueueSender)getProducer()).send(m,deliveryMode,priority,expiration);
- break;
- }
- }
-
- catch(JMSException e)
- {
- throw new ControlException("Error in sending message",e);
- }
- return m;
- }
-
-
- @EventHandler(field = "_resourceContext", eventSet = ResourceContext.ResourceEvents.class, eventName = "onAcquire")
- public void onAcquire()
- {
- }
- /*
- * The onRelease event handler for the associated context This method will
- * release all resource acquired by onAcquire.
- */
- @EventHandler(field = "_resourceContext", eventSet = ResourceContext.ResourceEvents.class, eventName = "onRelease")
- public void onRelease()
- {
- close();
- }
-
- /* Protected Method(s) */
-
- /**
- * Close any resources.
- */
- protected void close()
- {
- try
- {
- if (_producer != null)
- {
- _producer.close();
- _producer = null;
- }
- if (_session != null)
- {
- _session.close();
- _session = null;
- }
- if (_connection != null)
- {
- _connection.close();
- _connection = null;
- }
- } catch (JMSException jmse)
- {
- throw new ControlException("Unable to release JMS resource", jmse);
- }
- }
-
- /**
- * Deterimine whether we are working with a queue or a topic.
- */
- protected void determineDestination()
- {
- Destination props = getDestinationProperties();
- String factory = props.jndiConnectionFactory();
- ConnectionFactory cfactory = (ConnectionFactory) getJndiControl().getResource(factory,ConnectionFactory.class);
- if(cfactory instanceof QueueConnectionFactory && cfactory instanceof TopicConnectionFactory)
- {
- javax.jms.Destination dest = getDestination();
- if(dest instanceof Queue && dest instanceof Topic)
- {
- /* Try to create a topic producer...if fail then assume that it is a queue */
- try
- {
- createTopicSession();
- _producer = ((TopicSession)getSession()).createPublisher((Topic) getDestination());
- }
- catch(Exception e)
- {
- close();
- _destinationType = DestinationType.Queue;
- return;
- }
- _destinationType = DestinationType.Topic;
- }
- else
- {
- if(dest instanceof javax.jms.Queue)
- {
- _destinationType = DestinationType.Queue;
- }
- if(dest instanceof javax.jms.Topic)
- {
- _destinationType = DestinationType.Topic;
- }
- }
- }
- else
- {
- if(cfactory instanceof QueueConnectionFactory)
- {
- _destinationType = DestinationType.Queue;
- }
- if(cfactory instanceof TopicConnectionFactory)
- {
- _destinationType = DestinationType.Topic;
- }
- }
- }
-
- /**
- * Get the queue/topic producer.
- * @return the JMS producer.
- */
- protected MessageProducer getProducer()
- {
- if(_producer == null)
- {
- //
- // Acquire the publisher/sender.
- //
- try
- {
- javax.jms.Session sess = getSession();
- switch(getDestinationType())
- {
- case Auto:
- try
- {
- _producer = ((QueueSession)sess).createSender((Queue) getDestination());
- }
- catch(Exception e)
- {
- _producer = ((TopicSession)sess).createPublisher((Topic) getDestination());
- }
- break;
- case Topic:
- _producer = ((TopicSession)sess).createPublisher((Topic) getDestination());
- break;
- case Queue:
- _producer = ((QueueSession)sess).createSender((Queue) getDestination());
- break;
- }
- } catch (JMSException jmse)
- {
- throw new ControlException("Unable to acquire JMS resource", jmse);
- }
- }
- return _producer;
- }
-
-
- /**
- * Creates a topic session.
- */
- protected void createTopicSession() throws JMSException
- {
- Destination props = getDestinationProperties();
- String factory = props.jndiConnectionFactory();
- boolean transacted = props.transacted();
- AcknowledgeMode ackMode = props.acknowledgeMode();
- TopicConnectionFactory connFactory = (TopicConnectionFactory) getJndiControl()
- .getResource(factory,
- TopicConnectionFactory.class);
- _connection = connFactory.createTopicConnection();
- _session = ((TopicConnection) _connection).createTopicSession(
- transacted, modeToJmsMode(ackMode));
-
- }
-
-
- /**
- * Creates a queue session.
- */
- protected void createQueueSession() throws JMSException
- {
- Destination props = getDestinationProperties();
- String factory = props.jndiConnectionFactory();
- boolean transacted = props.transacted();
- AcknowledgeMode ackMode = props.acknowledgeMode();
- QueueConnectionFactory connFactory = (QueueConnectionFactory) getJndiControl()
- .getResource(factory,
- QueueConnectionFactory.class);
- _connection = connFactory.createQueueConnection();
- _session = ((QueueConnection) _connection).createQueueSession(
- transacted, modeToJmsMode(ackMode));
-
- }
- /**
- * Convert the enum to the JMS ack mode.
- * @param mode the enum mode.
- * @return the JMS mode.
- */
- protected int modeToJmsMode(AcknowledgeMode mode)
- {
- if (mode == AcknowledgeMode.Auto)
- {
- return Session.AUTO_ACKNOWLEDGE;
- } else if (mode == AcknowledgeMode.Client)
- {
- return Session.CLIENT_ACKNOWLEDGE;
- } else
- {
- return Session.DUPS_OK_ACKNOWLEDGE;
- }
- }
- /**
- * Convert the object to the JMS delivery mode.
- * @param value a integer valued string, integer or DeliveryMode.
- * @return the JMS delivery mode.
- */
- protected int deliveryModeToJmsMode(Object value)
- {
- if(value instanceof DeliveryMode)
- {
- DeliveryMode mode = (DeliveryMode)value;
- switch(mode)
- {
- case Persistent:
- return javax.jms.DeliveryMode.PERSISTENT;
- case NonPersistent:
- return javax.jms.DeliveryMode.NON_PERSISTENT;
- }
- }
- if (value instanceof Number)
- {
- return ((Number)value).intValue();
- }
- else if (value instanceof String)
- {
- return Integer.parseInt((String)value);
- }
- throw new IllegalArgumentException("Invalid delivery mode value");
- }
- /**
- * Check the given value to see if is appropriate for the given message value class.
- * @param value the body.
- * @param cls the expected class.
- */
- protected void checkBody(Object value,Class cls) throws ControlException
- {
- if(!cls.isInstance(value))
- {
- throw new ControlException("Message body is not of correct type expected "+cls.getName());
- }
- }
- /**
- * Get the destination annotation info for the message.
- * @return the destination method annotation.
- */
- protected Destination getDestinationProperties()
- {
- return (Destination) _context.getControlPropertySet(Destination.class);
- }
-
- /**
- * Get the topic/queue state.
- * Determine what connection and session from the destination
- * itself if sendType is Auto otherwise use the Destination
- * annotation element.
- * @return true if the destination is a topic.
- */
- protected DestinationType getDestinationType() throws ControlException
- {
- if(_destinationType != DestinationType.Auto)
- {
- return _destinationType;
- }
- Destination d = getDestinationProperties();
- if(d.sendType() != DestinationType.Auto)
- {
- _destinationType = d.sendType();
- }
- return d.sendType();
- }
-
-
-
-
- /**
- * Set the properties of the given message.
- * @param msg the message.
- */
- protected void setMessageProperties(javax.jms.Message msg)
- throws ControlException
- {
- if(_properties == null)
- {
- return;
- }
- Iterator i = _properties.keySet().iterator();
- while (i.hasNext())
- {
- String name = (String) i.next();
- Object value = _properties.get(name);
- try
- {
- msg.setObjectProperty(name,value);
- }
- catch(JMSException e)
- {
- throw new ControlException("Cannot set property '"+name+"' into JMS message");
- }
- }
- }
-
- /**
- * Set the headers. Accessing msg may throw exception.
- * @param msg the message.
- */
- protected void setMessageHeaders(javax.jms.Message msg)
- throws ControlException
- {
- if(_headers == null)
- {
- return;
- }
- Iterator<HeaderType> i = _headers.keySet().iterator();
- while (i.hasNext())
- {
- HeaderType name = i.next();
- Object value = _headers.get(name);
- setMessageHeader(msg,name,value);
- }
- }
- /**
- * Set the header value of the given message.
- * @param msg the message.
- * @param type the header type.
- * @param value the value.
- */
- protected void setMessageHeader(javax.jms.Message msg,HeaderType type,Object value)
- {
- switch(type)
- {
-
- case JMSCorrelationID:
- try
- {
- if (value instanceof byte[])
- {
- msg.setJMSCorrelationIDAsBytes((byte[])value);
- }
- else if (value instanceof String)
- {
- msg.setJMSCorrelationID((String)value);
- }
- }
- catch (javax.jms.JMSException e)
- {
- throw new ControlException("Error setting JMSCorrelationID for message", e);
- }
- break;
- case JMSPriority:
- try
- {
- if (value instanceof Integer)
- {
- msg.setJMSPriority(((Integer)value).intValue());
- }
- else if (value instanceof String)
- {
- msg.setJMSPriority(Integer.getInteger((String)value));
- }
- }
- catch (javax.jms.JMSException e)
- {
- throw new ControlException("Error setting JMSPriority for message", e);
- }
- break;
- case JMSExpiration:
- try
- {
- if (value instanceof Long)
- {
- msg.setJMSExpiration(((Long)value).longValue());
- }
- else if (value instanceof String)
- {
- msg.setJMSExpiration(Long.parseLong((String)value));
- }
- } catch (javax.jms.JMSException e)
- {
- throw new ControlException("Error setting JMSExpiration for message", e);
- }
- break;
- case JMSType:
- try
- {
- msg.setJMSType((String)value);
- }
- catch (javax.jms.JMSException e)
- {
- throw new ControlException("Error setting JMSType for message", e);
- }
- break;
- }
- }
-
- /**
- * Get the header type value.
- * @param name the header type.
- * @return the value or null.
- */
- protected Object getHeader(HeaderType name)
- {
- if(_headers == null)
- {
- return null;
- }
- return _headers.get(name.toString());
- }
- /**
- * Get the header type value as a string.
- * @param name the header type.
- * @return the string value or null.
- */
- protected String getHeaderAsString(HeaderType name)
- {
- Object obj = getHeader(name);
- if(obj != null)
- {
- return obj.toString();
- }
- return null;
- }
-
- /**
- * Get the header type value as a int.
- * @param name the header type.
- * @return the int value or null.
- */
- protected Integer getHeaderAsInteger(HeaderType name)
- {
- Object obj = getHeader(name);
- return valueAsInteger(getHeader(name));
- }
- /**
- * Return the given value as an integer.
- * @param obj a string or number object.
- * @return the integer value or null.
- */
- protected Integer valueAsInteger(Object obj)
- {
- if(obj instanceof String)
- {
- return Integer.parseInt((String)obj);
- }
- else if(obj instanceof Integer)
- {
- return (Integer)obj;
- }
- else if(obj instanceof Number)
- {
- return new Integer(((Number)obj).intValue());
- }
- return null;
- }
- /**
- * Get the header type value as a long.
- * @param name the header type.
- * @return the long value or null.
- */
- protected Long getHeaderAsLong(HeaderType name)
- {
- return valueAsLong(getHeader(name));
- }
- /**
- * Return the given value as a long.
- * @param obj a string or number object.
- * @return the long value or null.
- */
- protected Long valueAsLong(Object obj)
- {
- if(obj instanceof String)
- {
- return Long.parseLong((String)obj);
- }
- else if(obj instanceof Long)
- {
- return (Long)obj;
- }
- else if(obj instanceof Number)
- {
- return new Long(((Number)obj).longValue());
- }
- return null;
- }
- /**
- * Get the JNDI control instance.
- * @return the jndi-control.
- */
- protected JndiControlBean getJndiControl()
- {
- if(!_jndiInitialized)
- {
- _jndiInitialized = true;
- Destination dest = getDestinationProperties();
- String contextFactory = nullIfEmpty(dest.jndiContextFactory());
- if(contextFactory != null)
- {
- _jndiControl.setFactory(contextFactory);
- }
- String providerURL = nullIfEmpty(dest.jndiProviderURL());
- if(providerURL != null)
- {
- _jndiControl.setUrl(providerURL);
- }
- String userName = nullIfEmpty(dest.jndiUsername());
- if(userName != null)
- {
- _jndiControl.setJndiSecurityPrincipal(userName);
- }
- String password = nullIfEmpty(dest.jndiPassword());
- if(password != null)
- {
- _jndiControl.setJndiSecurityCredentials(password);
- }
- }
- return _jndiControl;
- }
-
- /**
- * Return null if the given string is null or an empty string.
- * @param str a string.
- * @return null or the string.
- */
- protected String nullIfEmpty(String str)
- {
- if(str == null || str.trim().length() == 0)
- {
- return null;
- }
- return str;
- }
- @Context
- private ResourceContext _resourceContext;
-
- @Context
- private ControlBeanContext _context;
-
- @Control
- private JndiControlBean _jndiControl;
-
- /** The destination */
- private transient javax.jms.Destination _destination;
-
- private transient DestinationType _destinationType = DestinationType.Auto;
-
- /** The connection. */
- private transient javax.jms.Connection _connection;
-
- /** The session */
- private transient Session _session;
-
- /** The message producer. */
- private transient MessageProducer _producer;
-
- /** The JNDI control has been initialized */
- private boolean _jndiInitialized = false;
-
- /** The headers to set in the next message to be sent. */
- private Map<HeaderType,Object> _headers;
-
- /** The properties to set in the next message to be sent. */
- private Map<String,Object> _properties;
-
-}
+/*
+ * Copyright 2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Header:$
+ */
+package org.apache.beehive.controls.system.jms.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.ConnectionFactory;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.BytesMessage;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+import org.apache.beehive.controls.api.ControlException;
+import org.apache.beehive.controls.api.events.EventHandler;
+import org.apache.beehive.controls.api.context.ControlBeanContext;
+import org.apache.beehive.controls.api.context.Context;
+import org.apache.beehive.controls.api.context.ResourceContext;
+import org.apache.beehive.controls.api.bean.ControlImplementation;
+import org.apache.beehive.controls.api.bean.Extensible;
+import org.apache.beehive.controls.api.bean.Control;
+import org.apache.beehive.controls.system.jms.JMSControl;
+import org.apache.beehive.controls.system.jndi.JndiControlBean;
+import org.apache.xmlbeans.XmlObject;
+
+/**
+ * <p>
+ * Implementation of the {@link JMSControl}.
+ * </p>
+ */
+@ControlImplementation
+public class JMSControlImpl
+ implements JMSControl, Extensible, java.io.Serializable {
+
+ /**
+ * Implementation of the {@link org.apache.beehive.controls.system.jms.JMSControl#getSession()} method.
+ * @return the {@link Session}
+ * @throws ControlException when an error occurs trying to create a JMS session
+ */
+ public Session getSession() throws ControlException
+ {
+ if (_session == null)
+ {
+ try
+ {
+ switch(getDestinationType())
+ {
+ case Auto:
+ determineDestination();
+ return getSession();
+ case Topic:
+ createTopicSession();
+ break;
+ case Queue:
+ createQueueSession();
+ break;
+ }
+
+ }
+ catch(JMSException e)
+ {
+ throw new ControlException("Failure to get JMS connection or session",e);
+ }
+ }
+ return _session;
+ }
+
+ /**
+ * Get the JMS {@link Destination}. Implementation of the
+ * {@link org.apache.beehive.controls.system.jms.JMSControl#getDestination()} method.
+ */
+ public javax.jms.Destination getDestination() throws ControlException
+ {
+ if (_destination == null)
+ {
+ _destination = (javax.jms.Destination)getJndiControl().getResource(
+ getDestinationProperties().sendJndiName(), javax.jms.Destination.class);
+ }
+ return _destination;
+ }
+
+ /**
+ * Get the JMS {@link javax.jms.Connection}. Implementation of the
+ * {@link JMSControl#getConnection()}.
+ */
+ public javax.jms.Connection getConnection() throws ControlException
+ {
+ getSession();
+ return _connection;
+ }
+
+ /**
+ * @see JMSControl#setHeaders
+ */
+ public void setHeaders(Map headers)
+ {
+ if(headers == null)
+ return;
+
+ HashMap map = new HashMap<JMSControl.HeaderType,Object>();
+ Iterator i = headers.keySet().iterator();
+ while (i.hasNext())
+ {
+ Object name = i.next();
+ Object value = headers.get(name);
+ HeaderType type = null;
+ /*
+ * Allow for string valued or HeaderType valued
+ * map entries.
+ */
+ if(name instanceof HeaderType)
+ {
+ type = (HeaderType)name;
+ }
+ else
+ {
+ if (name.equals(HeaderType.JMSCorrelationID.toString()))
+ {
+ type = HeaderType.JMSCorrelationID;
+ }
+ else if (name.equals(HeaderType.JMSDeliveryMode.toString()))
+ {
+ type = HeaderType.JMSDeliveryMode;
+ }
+ else if (name.equals(HeaderType.JMSExpiration.toString()))
+ {
+ type = HeaderType.JMSExpiration;
+ }
+ else if (name.equals(HeaderType.JMSMessageID.toString()))
+ {
+ type = HeaderType.JMSMessageID;
+ }
+ else if (name.equals(HeaderType.JMSPriority.toString()))
+ {
+ type = HeaderType.JMSPriority;
+ }
+ else if (name.equals(HeaderType.JMSRedelivered.toString()))
+ {
+ type = HeaderType.JMSRedelivered;
+ }
+ else if (name.equals(HeaderType.JMSTimestamp.toString()))
+ {
+ type = HeaderType.JMSTimestamp;
+ }
+ else if (name.equals(HeaderType.JMSType.toString()))
+ {
+ type = HeaderType.JMSType;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid JMS header type '"+name+"'");
+ }
+ }
+ map.put(type,value);
+ }
+ Iterator<JMSControl.HeaderType> iter = map.keySet().iterator();
+ while(iter.hasNext())
+ {
+ JMSControl.HeaderType key = iter.next();
+ setHeader(key,map.get(key));
+ }
+ }
+
+ /**
+ * @see JMSControl#setHeader
+ */
+ public void setHeader(JMSControl.HeaderType type,Object value)
+ {
+ if(_headers == null)
+ {
+ _headers = new HashMap<HeaderType,Object>();
+ }
+ _headers.put(type,value);
+ }
+
+
+ /**
+ * @see JMSControl#setProperties
+ */
+ public void setProperties(Map properties)
+ {
+ if(properties == null)
+ return;
+
+ Iterator<String> i = properties.keySet().iterator();
+ while (i.hasNext())
+ {
+ String name = i.next();
+ Object value = properties.get(name);
+ setProperty(name,value);
+ }
+ }
+ /**
+ * @see JMSControl#setProperty
+ */
+ public void setProperty(String name,Object value)
+ {
+ if(_properties == null)
+ {
+ _properties = new HashMap<String,Object>();
+ }
+ _properties.put(name,value);
+ }
+
+
+ /**
+ * Implementation of the {@link Extensible#invoke(java.lang.reflect.Method, Object[])} method. This
+ * method allows extension by interface.
+ */
+ public Object invoke(Method method, Object[] args) throws ControlException
+ {
+ assert (method != null && args != null);
+ javax.jms.Message m = null;
+ boolean isBody = false;
+ try
+ {
+ Destination props = getDestinationProperties();
+ Session session = getSession();
+ String jmsType = null;
+ String correlationId = null;
+ /*
+ * Set the deliveryMode, priority and expiration.
+ */
+ int deliveryMode = getProducer().getDeliveryMode();
+ int priority = getProducer().getPriority();
+ long expiration = getProducer().getTimeToLive();
+ Object mode = getHeader(HeaderType.JMSDeliveryMode);
+ if(mode != null)
+ {
+ deliveryMode = deliveryModeToJmsMode(mode);
+ }
+ Integer v = getHeaderAsInteger(HeaderType.JMSPriority);
+ if(v != null)
+ {
+ priority = v.intValue();
+ }
+ Long l = getHeaderAsLong(HeaderType.JMSExpiration);
+ if(l != null)
+ {
+ expiration = l.longValue();
+ }
+ /*
+ * Get the body of the message.
+ */
+ Object body = null;
+ // Check to see if any parameter has annotation. If it doesn't then don't bother
+ // checking them.
+ boolean hasAnnotation = method.getParameterAnnotations().length > 0;
+ for (int i= 0; i< args.length; i++)
+ {
+ if(hasAnnotation)
+ {
+ if (_context.getParameterPropertySet(method, i, Priority.class) != null)
+ {
+ continue;
+ }
+ if (_context.getParameterPropertySet(method, i, Property.class) != null)
+ {
+ continue;
+ }
+ if (_context.getParameterPropertySet(method, i, Expiration.class) != null)
+ {
+ continue;
+ }
+ if (_context.getParameterPropertySet(method, i, Delivery.class) != null)
+ {
+ continue;
+ }
+ if (_context.getParameterPropertySet(method, i, Type.class) != null)
+ {
+ continue;
+ }
+ if (_context.getParameterPropertySet(method, i, CorrelationId.class) != null)
+ {
+ continue;
+ }
+ }
+ if(isBody)
+ {
+ throw new IllegalArgumentException("At most one parameter may be defined as the body of the JMS message");
+ }
+ body = args[i];
+ isBody = true;
+ }
+ /*
+ * Get the method level annotation properties.
+ */
+ Priority pri = _context.getMethodPropertySet(method,Priority.class);
+ if(pri != null && pri.value() != -1)
+ {
+ priority = pri.value();
+ }
+ Expiration exp = _context.getMethodPropertySet(method,Expiration.class);
+ if(exp != null && exp.value() != -1L)
+ {
+ expiration = exp.value();
+ }
+ Delivery del = _context.getMethodPropertySet(method,Delivery.class);
+ if(del != null && del.value() != DeliveryMode.Auto)
+ {
+ deliveryMode = deliveryModeToJmsMode(del.value());
+ }
+ Type t = _context.getMethodPropertySet(method,Type.class);
+ if(t != null && t.value().length() != 0)
+ {
+ jmsType = t.value();
+ }
+ CorrelationId id = _context.getMethodPropertySet(method,CorrelationId.class);
+ if(id != null && id.value().length() != 0)
+ {
+ correlationId = id.value();
+ }
+ /*
+ * Create a message of the appropriate type and set the body.
+ */
+ JMSControl.Message mess = _context.getMethodPropertySet(method,JMSControl.Message.class);
+ MessageType type = MessageType.Auto;
+ if(mess != null)
+ {
+ type = mess.value();
+ }
+ if(type.equals(MessageType.Auto))
+ {
+ if(body instanceof byte[])
+ {
+ type = MessageType.Bytes;
+ }
+ else if(body instanceof Map || !isBody)
+ {
+ type = MessageType.Map;
+ }
+ else if(body instanceof String || body instanceof XmlObject)
+ {
+ type = MessageType.Text;
+ }
+ else if(body instanceof javax.jms.Message)
+ {
+ type = MessageType.JMSMessage;
+ }
+ else if(body instanceof Serializable)
+ {
+ type = MessageType.Object;
+ }
+ else
+ {
+ throw new ControlException("Cannot determine message type from body");
+ }
+ }
+ switch(type)
+ {
+ case Object:
+ checkBody(body,Serializable.class);
+ m = session.createObjectMessage((Serializable)body);
+ break;
+ case Bytes:
+ BytesMessage sm;
+ checkBody(body,byte[].class);
+ m = sm = session.createBytesMessage();
+ sm.writeBytes((byte[])body);
+ break;
+ case Text:
+ if (body instanceof XmlObject)
+ {
+ body = (((XmlObject)body)).xmlText();
+ }
+ checkBody(body,String.class);
+ m = session.createTextMessage((String)body);
+ break;
+ case Map:
+ MapMessage mm;
+ checkBody(body,Map.class);
+ m = mm = session.createMapMessage();
+ Map map =(Map)body;
+ Iterator iter = map.keySet().iterator();
+ while(iter.hasNext())
+ {
+ String key = (String)iter.next();
+ mm.setObject(key,map.get(key));
+ }
+ break;
+ case JMSMessage:
+ checkBody(body,javax.jms.Message.class);
+ m = (javax.jms.Message)body;
+ break;
+ }
+ /*
+ * Set the correlation id if given.
+ */
+ String correlationProp = props.sendCorrelationProperty();
+ if(correlationProp != null && correlationProp.length() == 0)
+ {
+ correlationProp = null;
+ }
+ if(correlationId == null)
+ {
+ correlationId = getHeaderAsString(HeaderType.JMSCorrelationID);
+ }
+ Properties jmsProps =
+ (Properties)_context.getMethodPropertySet(method,Properties.class);
+ if(jmsProps != null && jmsProps.value() != null)
+ {
+ PropertyValue[] jprops = jmsProps.value();
+ for(int i = 0; i < jprops.length; i++)
+ {
+ PropertyValue jprop = jprops[i];
+ Class cls = jprop.type();
+ if(cls.equals(String.class))
+ {
+ m.setStringProperty(jprop.name(),jprop.value());
+ }
+ else if(cls.equals(Integer.class))
+ {
+ m.setIntProperty(jprop.name(),valueAsInteger(jprop.value()));
+ }
+ else if(cls.equals(Long.class))
+ {
+ m.setLongProperty(jprop.name(),valueAsLong(jprop.value()));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid type for property-value");
+ }
+ }
+ }
+ /*
+ * Set the properties/headers of the message from
+ * the parameters to the invoke.
+ */
+ if(hasAnnotation)
+ {
+ for (int i= 0; i< args.length; i++)
+ {
+ Property jmsProp =
+ (Property)_context.getParameterPropertySet(method,i, Property.class);
+ if (jmsProp != null)
+ {
+ m.setObjectProperty(jmsProp.name(), args[i]);
+ }
+ Priority jmsPriority =
+ (Priority)_context.getParameterPropertySet(method,i, Priority.class);
+ if (jmsPriority != null)
+ {
+ Integer p = valueAsInteger(args[i]);
+ if(p != null)
+ {
+ priority = p.intValue();
+ }
+ }
+ Expiration jmsExpiration =
+ (Expiration)_context.getParameterPropertySet(method,i, Expiration.class);
+ if (jmsExpiration != null)
+ {
+ Long e = valueAsLong(args[i]);
+ if(e != null)
+ {
+ expiration = e.longValue();
+ }
+ }
+ Delivery jmsDelivery =
+ (Delivery)_context.getParameterPropertySet(method,i, Delivery.class);
+ if (jmsDelivery != null && args[i] != null)
+ {
+ deliveryMode = deliveryModeToJmsMode(args[i]);
+ }
+ t =
+ (Type)_context.getParameterPropertySet(method,i, Type.class);
+ if (t != null && args[i] != null)
+ {
+ jmsType = (String)args[i];
+ }
+ CorrelationId jmsId =
+ (CorrelationId)_context.getParameterPropertySet(method,i, CorrelationId.class);
+ if (jmsId != null && args[i] != null)
+ {
+ correlationId = (String)args[i];
+ }
+ }
+ }
+ if(correlationProp != null)
+ {
+ m.setStringProperty(correlationProp,correlationId);
+ }
+ else
+ {
+ m.setJMSCorrelationID(correlationId);
+ }
+ /*
+ * Set the headers and properties from maps provided
+ * by setProperties() and setHeaders()
+ */
+ m.setJMSExpiration(expiration);
+ m.setJMSDeliveryMode(deliveryMode);
+ m.setJMSPriority(priority);
+ setMessageHeaders(m);
+ setMessageProperties(m);
+ expiration = m.getJMSExpiration();
+ deliveryMode = m.getJMSDeliveryMode();
+ priority = m.getJMSPriority();
+ _headers = null;
+ _properties = null;
+ /*
+ * Send the message.
+ */
+ switch(getDestinationType())
+ {
+ case Topic:
+ ((TopicPublisher)getProducer()).publish(m,deliveryMode,priority,expiration);
+ break;
+ case Queue:
+ ((QueueSender)getProducer()).send(m,deliveryMode,priority,expiration);
+ break;
+ }
+ }
+
+ catch(JMSException e)
+ {
+ throw new ControlException("Error in sending message",e);
+ }
+ return m;
+ }
+
+
+ @EventHandler(field = "_resourceContext", eventSet = ResourceContext.ResourceEvents.class, eventName = "onAcquire")
+ public void onAcquire()
+ {
+ }
+
+ /*
+ * The onRelease event handler for the associated context This method will
+ * release all resource acquired by onAcquire.
+ */
+ @EventHandler(field = "_resourceContext", eventSet = ResourceContext.ResourceEvents.class, eventName = "onRelease")
+ public void onRelease()
+ {
+ close();
+ }
+
+ /**
+ * Release any JMS related resources.
+ */
+ protected void close()
+ {
+ try
+ {
+ if (_producer != null)
+ {
+ _producer.close();
+ _producer = null;
+ }
+ if (_session != null)
+ {
+ _session.close();
+ _session = null;
+ }
+ if (_connection != null)
+ {
+ _connection.close();
+ _connection = null;
+ }
+ } catch (JMSException jmse)
+ {
+ throw new ControlException("Unable to release JMS resource", jmse);
+ }
+ }
+
+ /**
+ * Deterimine whether we are working with a queue or a topic.
+ */
+ protected void determineDestination()
+ {
+ Destination props = getDestinationProperties();
+ String factory = props.jndiConnectionFactory();
+ ConnectionFactory cfactory = (ConnectionFactory) getJndiControl().getResource(factory,ConnectionFactory.class);
+ if(cfactory instanceof QueueConnectionFactory && cfactory instanceof TopicConnectionFactory)
+ {
+ javax.jms.Destination dest = getDestination();
+ if(dest instanceof Queue && dest instanceof Topic)
+ {
+ /* Try to create a topic producer...if fail then assume that it is a queue */
+ try
+ {
+ createTopicSession();
+ _producer = ((TopicSession)getSession()).createPublisher((Topic) getDestination());
+ }
+ catch(Exception e)
+ {
+ close();
+ _destinationType = DestinationType.Queue;
+ return;
+ }
+ _destinationType = DestinationType.Topic;
+ }
+ else
+ {
+ if(dest instanceof javax.jms.Queue)
+ {
+ _destinationType = DestinationType.Queue;
+ }
+ if(dest instanceof javax.jms.Topic)
+ {
+ _destinationType = DestinationType.Topic;
+ }
+ }
+ }
+ else
+ {
+ if(cfactory instanceof QueueConnectionFactory)
+ {
+ _destinationType = DestinationType.Queue;
+ }
+ if(cfactory instanceof TopicConnectionFactory)
+ {
+ _destinationType = DestinationType.Topic;
+ }
+ }
+ }
+
+ /**
+ * Get the queue/topic producer.
+ * @return the JMS producer.
+ */
+ protected MessageProducer getProducer()
+ {
+ if(_producer == null)
+ {
+ //
+ // Acquire the publisher/sender.
+ //
+ try
+ {
+ javax.jms.Session sess = getSession();
+ switch(getDestinationType())
+ {
+ case Auto:
+ try
+ {
+ _producer = ((QueueSession)sess).createSender((Queue) getDestination());
+ }
+ catch(Exception e)
+ {
+ _producer = ((TopicSession)sess).createPublisher((Topic) getDestination());
+ }
+ break;
+ case Topic:
+ _producer = ((TopicSession)sess).createPublisher((Topic) getDestination());
+ break;
+ case Queue:
+ _producer = ((QueueSession)sess).createSender((Queue) getDestination());
+ break;
+ }
+ } catch (JMSException jmse)
+ {
+ throw new ControlException("Unable to acquire JMS resource", jmse);
+ }
+ }
+ return _producer;
+ }
+
+
+ /**
+ * Creates a topic session.
+ */
+ protected void createTopicSession() throws JMSException
+ {
+ Destination props = getDestinationProperties();
+ String factory = props.jndiConnectionFactory();
+ boolean transacted = props.transacted();
+ AcknowledgeMode ackMode = props.acknowledgeMode();
+ TopicConnectionFactory connFactory = (TopicConnectionFactory) getJndiControl()
+ .getResource(factory, TopicConnectionFactory.class);
+ _connection = connFactory.createTopicConnection();
+ _session = ((TopicConnection) _connection).createTopicSession(
+ transacted, modeToJmsMode(ackMode));
+
+ }
+
+
+ /**
+ * Creates a queue session.
+ */
+ protected void createQueueSession() throws JMSException
+ {
+ Destination props = getDestinationProperties();
+ String factory = props.jndiConnectionFactory();
+ boolean transacted = props.transacted();
+ AcknowledgeMode ackMode = props.acknowledgeMode();
+ QueueConnectionFactory connFactory = (QueueConnectionFactory) getJndiControl()
+ .getResource(factory, QueueConnectionFactory.class);
+ _connection = connFactory.createQueueConnection();
+ _session = ((QueueConnection) _connection).createQueueSession(
+ transacted, modeToJmsMode(ackMode));
+
+ }
+ /**
+ * Convert the enum to the JMS ack mode.
+ * @param mode the enum mode.
+ * @return the JMS mode.
+ */
+ protected int modeToJmsMode(AcknowledgeMode mode)
+ {
+ if (mode == AcknowledgeMode.Auto)
+ {
+ return Session.AUTO_ACKNOWLEDGE;
+ } else if (mode == AcknowledgeMode.Client)
+ {
+ return Session.CLIENT_ACKNOWLEDGE;
+ } else
+ {
+ return Session.DUPS_OK_ACKNOWLEDGE;
+ }
+ }
+ /**
+ * Convert the object to the JMS delivery mode.
+ * @param value a integer valued string, integer or DeliveryMode.
+ * @return the JMS delivery mode.
+ */
+ protected int deliveryModeToJmsMode(Object value)
+ {
+ if(value instanceof DeliveryMode)
+ {
+ DeliveryMode mode = (DeliveryMode)value;
+ switch(mode)
+ {
+ case Persistent:
+ return javax.jms.DeliveryMode.PERSISTENT;
+ case NonPersistent:
+ return javax.jms.DeliveryMode.NON_PERSISTENT;
+ }
+ }
+ if (value instanceof Number)
+ {
+ return ((Number)value).intValue();
+ }
+ else if (value instanceof String)
+ {
+ return Integer.parseInt((String)value);
+ }
+ throw new IllegalArgumentException("Invalid delivery mode value");
+ }
+ /**
+ * Check the given value to see if is appropriate for the given message value class.
+ * @param value the body.
+ * @param cls the expected class.
+ */
+ protected void checkBody(Object value,Class cls) throws ControlException
+ {
+ if(!cls.isInstance(value))
+ {
+ throw new ControlException("Message body is not of correct type expected "+cls.getName());
+ }
+ }
+ /**
+ * Get the destination annotation info for the message.
+ * @return the destination method annotation.
+ */
+ protected Destination getDestinationProperties()
+ {
+ return (Destination) _context.getControlPropertySet(Destination.class);
+ }
+
+ /**
+ * Get the topic/queue state.
+ * Determine what connection and session from the destination
+ * itself if sendType is Auto otherwise use the Destination
+ * annotation element.
+ * @return true if the destination is a topic.
+ */
+ protected DestinationType getDestinationType() throws ControlException
+ {
+ if(_destinationType != DestinationType.Auto)
+ {
+ return _destinationType;
+ }
+ Destination d = getDestinationProperties();
+ if(d.sendType() != DestinationType.Auto)
+ {
+ _destinationType = d.sendType();
+ }
+ return d.sendType();
+ }
+
+
+
+
+ /**
+ * Set the properties of the given message.
+ * @param msg the message.
+ */
+ protected void setMessageProperties(javax.jms.Message msg)
+ throws ControlException
+ {
+ if(_properties == null)
+ {
+ return;
+ }
+ Iterator i = _properties.keySet().iterator();
+ while (i.hasNext())
+ {
+ String name = (String) i.next();
+ Object value = _properties.get(name);
+ try
+ {
+ msg.setObjectProperty(name,value);
+ }
+ catch(JMSException e)
+ {
+ throw new ControlException("Cannot set property '"+name+"' into JMS message");
+ }
+ }
+ }
+
+ /**
+ * Set the headers. Accessing msg may throw exception.
+ * @param msg the message.
+ */
+ protected void setMessageHeaders(javax.jms.Message msg)
+ throws ControlException
+ {
+ if(_headers == null)
+ {
+ return;
+ }
+ Iterator<HeaderType> i = _headers.keySet().iterator();
+ while (i.hasNext())
+ {
+ HeaderType name = i.next();
+ Object value = _headers.get(name);
+ setMessageHeader(msg,name,value);
+ }
+ }
+ /**
+ * Set the header value of the given message.
+ * @param msg the message.
+ * @param type the header type.
+ * @param value the value.
+ */
+ protected void setMessageHeader(javax.jms.Message msg,HeaderType type,Object value)
+ {
+ switch(type)
+ {
+ case JMSCorrelationID:
+ try
+ {
+ if (value instanceof byte[])
+ {
+ msg.setJMSCorrelationIDAsBytes((byte[])value);
+ }
+ else if (value instanceof String)
+ {
+ msg.setJMSCorrelationID((String)value);
+ }
+ }
+ catch (javax.jms.JMSException e)
+ {
+ throw new ControlException("Error setting JMSCorrelationID for message", e);
+ }
+ break;
+ case JMSPriority:
+ try
+ {
+ if (value instanceof Integer)
+ {
+ msg.setJMSPriority(((Integer)value).intValue());
+ }
+ else if (value instanceof String)
+ {
+ msg.setJMSPriority(Integer.getInteger((String)value));
+ }
+ }
+ catch (javax.jms.JMSException e)
+ {
+ throw new ControlException("Error setting JMSPriority for message", e);
+ }
+ break;
+ case JMSExpiration:
+ try
+ {
+ if (value instanceof Long)
+ {
+ msg.setJMSExpiration(((Long)value).longValue());
+ }
+ else if (value instanceof String)
+ {
+ msg.setJMSExpiration(Long.parseLong((String)value));
+ }
+ } catch (javax.jms.JMSException e)
+ {
+ throw new ControlException("Error setting JMSExpiration for message", e);
+ }
+ break;
+ case JMSType:
+ try
+ {
+ msg.setJMSType((String)value);
+ }
+ catch (javax.jms.JMSException e)
+ {
+ throw new ControlException("Error setting JMSType for message", e);
+ }
+ break;
+ }
+ }
+
+ /**
+ * Get the header type value.
+ * @param name the header type.
+ * @return the value or null.
+ */
+ protected Object getHeader(HeaderType name)
+ {
+ if(_headers == null)
+ {
+ return null;
+ }
+ return _headers.get(name.toString());
+ }
+ /**
+ * Get the header type value as a string.
+ * @param name the header type.
+ * @return the string value or null.
+ */
+ protected String getHeaderAsString(HeaderType name)
+ {
+ Object obj = getHeader(name);
+ if(obj != null)
+ {
+ return obj.toString();
+ }
+ return null;
+ }
+
+ /**
+ * Get the header type value as a int.
+ * @param name the header type.
+ * @return the int value or null.
+ */
+ protected Integer getHeaderAsInteger(HeaderType name)
+ {
+ Object obj = getHeader(name);
+ return valueAsInteger(getHeader(name));
+ }
+ /**
+ * Return the given value as an integer.
+ * @param obj a string or number object.
+ * @return the integer value or null.
+ */
+ protected Integer valueAsInteger(Object obj)
+ {
+ if(obj instanceof String)
+ {
+ return Integer.parseInt((String)obj);
+ }
+ else if(obj instanceof Integer)
+ {
+ return (Integer)obj;
+ }
+ else if(obj instanceof Number)
+ {
+ return new Integer(((Number)obj).intValue());
+ }
+ return null;
+ }
+ /**
+ * Get the header type value as a long.
+ * @param name the header type.
+ * @return the long value or null.
+ */
+ protected Long getHeaderAsLong(HeaderType name)
+ {
+ return valueAsLong(getHeader(name));
+ }
+ /**
+ * Return the given value as a long.
+ * @param obj a string or number object.
+ * @return the long value or null.
+ */
+ protected Long valueAsLong(Object obj)
+ {
+ if(obj instanceof String)
+ {
+ return Long.parseLong((String)obj);
+ }
+ else if(obj instanceof Long)
+ {
+ return (Long)obj;
+ }
+ else if(obj instanceof Number)
+ {
+ return new Long(((Number)obj).longValue());
+ }
+ return null;
+ }
+ /**
+ * Get the JNDI control instance.
+ * @return the jndi-control.
+ */
+ protected JndiControlBean getJndiControl()
+ {
+ if(!_jndiInitialized)
+ {
+ _jndiInitialized = true;
+ Destination dest = getDestinationProperties();
+ String contextFactory = nullIfEmpty(dest.jndiContextFactory());
+ if(contextFactory != null)
+ {
+ _jndiControl.setFactory(contextFactory);
+ }
+ String providerURL = nullIfEmpty(dest.jndiProviderURL());
+ if(providerURL != null)
+ {
+ _jndiControl.setUrl(providerURL);
+ }
+ String userName = nullIfEmpty(dest.jndiUsername());
+ if(userName != null)
+ {
+ _jndiControl.setJndiSecurityPrincipal(userName);
+ }
+ String password = nullIfEmpty(dest.jndiPassword());
+ if(password != null)
+ {
+ _jndiControl.setJndiSecurityCredentials(password);
+ }
+ }
+ return _jndiControl;
+ }
+
+ /**
+ * Return null if the given string is null or an empty string.
+ * @param str a string.
+ * @return null or the string.
+ */
+ protected String nullIfEmpty(String str)
+ {
+ if(str == null || str.trim().length() == 0)
+ {
+ return null;
+ }
+ return str;
+ }
+ @Context
+ private ResourceContext _resourceContext;
+
+ @Context
+ private ControlBeanContext _context;
+
+ @Control
+ private JndiControlBean _jndiControl;
+
+ /** The destination */
+ private transient javax.jms.Destination _destination;
+
+ private transient DestinationType _destinationType = DestinationType.Auto;
+
+ /** The connection. */
+ private transient javax.jms.Connection _connection;
+
+ /** The session */
+ private transient Session _session;
+
+ /** The message producer. */
+ private transient MessageProducer _producer;
+
+ /** The JNDI control has been initialized */
+ private boolean _jndiInitialized = false;
+
+ /** The headers to set in the next message to be sent. */
+ private Map<HeaderType,Object> _headers;
+
+ /** The properties to set in the next message to be sent. */
+ private Map<String,Object> _properties;
+
+}