You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/02/15 17:37:22 UTC
svn commit: r507994 [1/2] - in
/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src:
main/java/org/apache/servicemix/jms/
main/java/org/apache/servicemix/jms/endpoint/
test/java/org/apache/servicemix/jms/
Author: gnodet
Date: Thu Feb 15 08:37:20 2007
New Revision: 507994
URL: http://svn.apache.org/viewvc?view=rev&rev=507994
Log:
SM-537, SM-724, SM-510: add several new jms endpoints that use marshalers
Added:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsConsumerEndpointTest.java (with props)
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (with props)
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsComponent.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsComponent.java?view=diff&rev=507994&r1=507993&r2=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsComponent.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsComponent.java Thu Feb 15 08:37:20 2007
@@ -17,6 +17,7 @@
package org.apache.servicemix.jms;
import java.net.URI;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,8 @@
import org.apache.servicemix.jbi.security.keystore.KeystoreManager;
import org.apache.servicemix.jbi.util.IntrospectionSupport;
import org.apache.servicemix.jbi.util.URISupport;
+import org.apache.servicemix.jms.endpoint.JmsConsumerEndpoint;
+import org.apache.servicemix.jms.endpoint.JmsProviderEndpoint;
/**
*
@@ -43,14 +46,16 @@
public class JmsComponent extends DefaultComponent {
protected JmsConfiguration configuration = new JmsConfiguration();
- protected JmsEndpoint[] endpoints;
+ protected JmsEndpointType[] endpoints;
protected List getConfiguredEndpoints() {
- return asList(endpoints);
+ return Arrays.asList(endpoints);
}
protected Class[] getEndpointClasses() {
- return new Class[] { JmsEndpoint.class };
+ return new Class[] { JmsEndpoint.class,
+ JmsConsumerEndpoint.class,
+ JmsProviderEndpoint.class };
}
/**
@@ -65,11 +70,11 @@
this.configuration = configuration;
}
- public JmsEndpoint[] getEndpoints() {
+ public JmsEndpointType[] getEndpoints() {
return endpoints;
}
- public void setEndpoints(JmsEndpoint[] endpoints) {
+ public void setEndpoints(JmsEndpointType[] endpoints) {
this.endpoints = endpoints;
}
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?view=diff&rev=507994&r1=507993&r2=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java Thu Feb 15 08:37:20 2007
@@ -45,7 +45,7 @@
* description="A jms endpoint"
*
*/
-public class JmsEndpoint extends SoapEndpoint {
+public class JmsEndpoint extends SoapEndpoint implements JmsEndpointType {
//
// Jms informations
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,5 @@
+package org.apache.servicemix.jms;
+
+public interface JmsEndpointType {
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpointType.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+import org.apache.servicemix.jms.endpoint.JmsConsumerMarshaler.JmsContext;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
+import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.jms.support.destination.DynamicDestinationResolver;
+
+public abstract class AbstractConsumerEndpoint extends ConsumerEndpoint {
+
+ private JmsConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+ private boolean synchronous = true;
+ private DestinationChooser destinationChooser;
+ private DestinationResolver destinationResolver = new DynamicDestinationResolver();
+ private boolean pubSubDomain = false;
+ private ConnectionFactory connectionFactory;
+ private JmsTemplate template;
+
+ // Reply properties
+ private Boolean useMessageIdInResponse;
+ private Destination replyDestination;
+ private String replyDestinationName;
+ private boolean replyExplicitQosEnabled = false;
+ private int replyDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private int replyPriority = Message.DEFAULT_PRIORITY;
+ private long replyTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ private Map replyProperties;
+
+ private Map<String, JmsContext> pendingExchanges;
+
+ /**
+ * @return the destinationChooser
+ */
+ public DestinationChooser getDestinationChooser() {
+ return destinationChooser;
+ }
+
+ /**
+ * @param destinationChooser the destinationChooser to set
+ */
+ public void setDestinationChooser(DestinationChooser destinationChooser) {
+ this.destinationChooser = destinationChooser;
+ }
+
+ /**
+ * @return the replyDeliveryMode
+ */
+ public int getReplyDeliveryMode() {
+ return replyDeliveryMode;
+ }
+
+ /**
+ * @param replyDeliveryMode the replyDeliveryMode to set
+ */
+ public void setReplyDeliveryMode(int replyDeliveryMode) {
+ this.replyDeliveryMode = replyDeliveryMode;
+ }
+
+ /**
+ * @return the replyDestination
+ */
+ public Destination getReplyDestination() {
+ return replyDestination;
+ }
+
+ /**
+ * @param replyDestination the replyDestination to set
+ */
+ public void setReplyDestination(Destination replyDestination) {
+ this.replyDestination = replyDestination;
+ }
+
+ /**
+ * @return the replyDestinationName
+ */
+ public String getReplyDestinationName() {
+ return replyDestinationName;
+ }
+
+ /**
+ * @param replyDestinationName the replyDestinationName to set
+ */
+ public void setReplyDestinationName(String replyDestinationName) {
+ this.replyDestinationName = replyDestinationName;
+ }
+
+ /**
+ * @return the replyExplicitQosEnabled
+ */
+ public boolean isReplyExplicitQosEnabled() {
+ return replyExplicitQosEnabled;
+ }
+
+ /**
+ * @param replyExplicitQosEnabled the replyExplicitQosEnabled to set
+ */
+ public void setReplyExplicitQosEnabled(boolean replyExplicitQosEnabled) {
+ this.replyExplicitQosEnabled = replyExplicitQosEnabled;
+ }
+
+ /**
+ * @return the replyPriority
+ */
+ public int getReplyPriority() {
+ return replyPriority;
+ }
+
+ /**
+ * @param replyPriority the replyPriority to set
+ */
+ public void setReplyPriority(int replyPriority) {
+ this.replyPriority = replyPriority;
+ }
+
+ /**
+ * @return the replyProperties
+ */
+ public Map getReplyProperties() {
+ return replyProperties;
+ }
+
+ /**
+ * @param replyProperties the replyProperties to set
+ */
+ public void setReplyProperties(Map replyProperties) {
+ this.replyProperties = replyProperties;
+ }
+
+ /**
+ * @return the replyTimeToLive
+ */
+ public long getReplyTimeToLive() {
+ return replyTimeToLive;
+ }
+
+ /**
+ * @param replyTimeToLive the replyTimeToLive to set
+ */
+ public void setReplyTimeToLive(long replyTimeToLive) {
+ this.replyTimeToLive = replyTimeToLive;
+ }
+
+ /**
+ * @return the useMessageIdInResponse
+ */
+ public Boolean getUseMessageIdInResponse() {
+ return useMessageIdInResponse;
+ }
+
+ /**
+ * @param useMessageIdInResponse the useMessageIdInResponse to set
+ */
+ public void setUseMessageIdInResponse(Boolean useMessageIdInResponse) {
+ this.useMessageIdInResponse = useMessageIdInResponse;
+ }
+
+ /**
+ * @return the connectionFactory
+ */
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ /**
+ * @param connectionFactory the connectionFactory to set
+ */
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * @return the pubSubDomain
+ */
+ public boolean isPubSubDomain() {
+ return pubSubDomain;
+ }
+
+ /**
+ * @param pubSubDomain the pubSubDomain to set
+ */
+ public void setPubSubDomain(boolean pubSubDomain) {
+ this.pubSubDomain = pubSubDomain;
+ }
+
+ /**
+ * @return the destinationResolver
+ */
+ public DestinationResolver getDestinationResolver() {
+ return destinationResolver;
+ }
+
+ /**
+ * @param destinationResolver the destinationResolver to set
+ */
+ public void setDestinationResolver(DestinationResolver destinationResolver) {
+ this.destinationResolver = destinationResolver;
+ }
+
+ /**
+ * @return the marshaler
+ */
+ public JmsConsumerMarshaler getMarshaler() {
+ return marshaler;
+ }
+
+ /**
+ * @param marshaler the marshaler to set
+ */
+ public void setMarshaler(JmsConsumerMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
+
+ /**
+ * @return the synchronous
+ */
+ public boolean isSynchronous() {
+ return synchronous;
+ }
+
+ /**
+ * @param synchronous the synchronous to set
+ */
+ public void setSynchronous(boolean synchronous) {
+ this.synchronous = synchronous;
+ }
+
+ public String getLocationURI() {
+ // TODO: Need to return a real URI
+ return getService() + "#" + getEndpoint();
+ }
+
+ public synchronized void start() throws Exception {
+ super.start();
+ if (template == null) {
+ template = new JmsTemplate(getConnectionFactory());
+ }
+ pendingExchanges = new ConcurrentHashMap<String, JmsContext>();
+ }
+
+ public synchronized void stop() throws Exception {
+ pendingExchanges.clear();
+ pendingExchanges = null;
+ super.stop();
+ }
+
+ public void process(MessageExchange exchange) throws Exception {
+ JmsContext context = pendingExchanges.remove(exchange.getExchangeId());
+ processExchange(exchange, null, context);
+ }
+
+ protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
+ // Ignore DONE exchanges
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ }
+ // Create session if needed
+ if (session == null) {
+ template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ try {
+ processExchange(exchange, session, context);
+ } catch (Exception e) {
+ throw new ListenerExecutionFailedException("Exchange processing failed", e);
+ }
+ return null;
+ }
+ });
+ }
+ // Handle exchanges
+ Message msg = null;
+ Destination dest = null;
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (exchange.getFault() != null) {
+ msg = marshaler.createFault(exchange, exchange.getFault(), session, context);
+ dest = getReplyDestination(exchange, exchange.getFault(), session, context);
+ } else if (exchange.getMessage("out") != null) {
+ msg = marshaler.createOut(exchange, exchange.getMessage("out"), session, context);
+ dest = getReplyDestination(exchange, exchange.getMessage("out"), session, context);
+ }
+ if (msg == null) {
+ throw new IllegalStateException("Unable to send back answer or fault");
+ }
+ setCorrelationId(context.getMessage(), msg);
+ send(msg, session, dest);
+ done(exchange);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ Exception error = exchange.getError();
+ if (error == null) {
+ error = new JBIException("Exchange in ERROR state, but no exception provided");
+ }
+ msg = marshaler.createError(exchange, error, session, context);
+ dest = getReplyDestination(exchange, error, session, context);
+ setCorrelationId(context.getMessage(), msg);
+ send(msg, session, dest);
+ } else {
+ throw new IllegalStateException("Unrecognized exchange status");
+ }
+ }
+
+ protected void send(Message msg, Session session, Destination dest) throws JMSException {
+ MessageProducer producer = session.createProducer(dest);
+ try {
+ if (replyProperties != null) {
+ for (Iterator it = replyProperties.entrySet().iterator(); it.hasNext();) {
+ Map.Entry e = (Map.Entry) it.next();
+ msg.setObjectProperty(e.getKey().toString(), e.getValue());
+ }
+ }
+ if (replyExplicitQosEnabled) {
+ producer.send(msg, replyDeliveryMode, replyPriority, replyTimeToLive);
+ } else {
+ producer.send(msg);
+ }
+ } finally {
+ JmsUtils.closeMessageProducer(producer);
+ }
+ }
+
+ protected void onMessage(Message jmsMessage, Session session) throws JMSException {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received: " + jmsMessage);
+ }
+ try {
+ JmsContext context = marshaler.createContext(jmsMessage, getContext());
+ MessageExchange exchange = marshaler.createExchange(context);
+ configureExchangeTarget(exchange);
+ if (synchronous) {
+ sendSync(exchange);
+ if (exchange.getStatus() != ExchangeStatus.DONE) {
+ processExchange(exchange, session, context);
+ }
+ } else {
+ pendingExchanges.put(exchange.getExchangeId(), context);
+ send(exchange);
+ }
+ } catch (JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ throw (JMSException) new JMSException("Error sending JBI exchange").initCause(e);
+ }
+ }
+
+ protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session, JmsContext context) throws JMSException {
+ // If a JMS ReplyTo property is set, use it
+ if (context.getMessage().getJMSReplyTo() != null) {
+ return context.getMessage().getJMSReplyTo();
+ }
+ Object dest = null;
+ // Let the destinationChooser a chance to choose the destination
+ if (destinationChooser != null) {
+ dest = destinationChooser.chooseDestination(exchange, message);
+ }
+ // Default to replyDestination / replyDestinationName properties
+ if (dest == null) {
+ dest = replyDestination;
+ }
+ if (dest == null) {
+ dest = replyDestinationName;
+ }
+ // Resolve destination if needed
+ if (dest instanceof Destination) {
+ return (Destination) dest;
+ } else if (dest instanceof String) {
+ return destinationResolver.resolveDestinationName(session,
+ (String) dest,
+ isPubSubDomain());
+ }
+ throw new IllegalStateException("Unable to choose destination for exchange " + exchange);
+ }
+
+ protected void setCorrelationId(Message query, Message reply) throws Exception {
+ if (useMessageIdInResponse == null) {
+ if (query.getJMSCorrelationID() != null) {
+ reply.setJMSCorrelationID(query.getJMSCorrelationID());
+ } else if (query.getJMSMessageID() != null) {
+ reply.setJMSCorrelationID(query.getJMSMessageID());
+ } else {
+ throw new IllegalStateException("No JMSCorrelationID or JMSMessageID set on query message");
+ }
+ } else if (useMessageIdInResponse.booleanValue()) {
+ if (query.getJMSMessageID() != null) {
+ reply.setJMSCorrelationID(query.getJMSMessageID());
+ } else {
+ throw new IllegalStateException("No JMSMessageID set on query message");
+ }
+ } else {
+ if (query.getJMSCorrelationID() != null) {
+ reply.setJMSCorrelationID(query.getJMSCorrelationID());
+ } else {
+ throw new IllegalStateException("No JMSCorrelationID set on query message");
+ }
+ }
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/AbstractConsumerEndpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import java.net.URI;
+
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+
+public class DefaultConsumerMarshaler implements JmsConsumerMarshaler {
+
+ private URI mep;
+
+ public DefaultConsumerMarshaler() {
+ this.mep = MessageExchangeSupport.IN_ONLY;
+ }
+
+ public DefaultConsumerMarshaler(URI mep) {
+ this.mep = mep;
+ }
+
+ /**
+ * @return the mep
+ */
+ public URI getMep() {
+ return mep;
+ }
+
+ /**
+ * @param mep the mep to set
+ */
+ public void setMep(URI mep) {
+ this.mep = mep;
+ }
+
+ public JmsContext createContext(Message message, ComponentContext context) throws Exception {
+ return new Context(message, context);
+ }
+
+ public MessageExchange createExchange(JmsContext context) throws Exception {
+ Context ctx = (Context) context;
+ MessageExchange exchange = ctx.componentContext.getDeliveryChannel().createExchangeFactory().createExchange(mep);
+ NormalizedMessage inMessage = exchange.createMessage();
+ populateMessage(ctx.message, inMessage);
+ exchange.setMessage(inMessage, "in");
+ return exchange;
+ }
+
+ public Message createOut(MessageExchange exchange, NormalizedMessage outMsg, Session session, JmsContext context) throws Exception {
+ String text = new SourceTransformer().contentToString(outMsg);
+ TextMessage msg = session.createTextMessage(text);
+ return msg;
+ }
+
+ public Message createFault(MessageExchange exchange, Fault fault, Session session, JmsContext context) throws Exception {
+ String text = new SourceTransformer().contentToString(fault);
+ TextMessage msg = session.createTextMessage(text);
+ return msg;
+ }
+
+ public Message createError(MessageExchange exchange, Exception error, Session session, JmsContext context) throws Exception {
+ throw error;
+ }
+
+ protected void populateMessage(Message message, NormalizedMessage normalizedMessage) throws Exception {
+ if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+ Source source = new StringSource(textMessage.getText());
+ normalizedMessage.setContent(source);
+ } else {
+ throw new UnsupportedOperationException("JMS message is not a TextMessage");
+ }
+ }
+
+ protected static class Context implements JmsContext {
+ Message message;
+ ComponentContext componentContext;
+ Context(Message message, ComponentContext componentContext) {
+ this.message = message;
+ this.componentContext = componentContext;
+ }
+ public Message getMessage() {
+ return this.message;
+ }
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultConsumerMarshaler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,48 @@
+package org.apache.servicemix.jms.endpoint;
+
+import java.util.Map;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+
+public class DefaultProviderMarshaler implements JmsProviderMarshaler {
+
+ private Map<String, Object> jmsProperties;
+ private SourceTransformer transformer = new SourceTransformer();
+
+ /**
+ * @return the jmsProperties
+ */
+ public Map<String, Object> getJmsProperties() {
+ return jmsProperties;
+ }
+
+ /**
+ * @param jmsProperties the jmsProperties to set
+ */
+ public void setJmsProperties(Map<String, Object> jmsProperties) {
+ this.jmsProperties = jmsProperties;
+ }
+
+ public Message createMessage(MessageExchange exchange, NormalizedMessage in, Session session) throws Exception {
+ TextMessage text = session.createTextMessage();
+ text.setText(transformer.contentToString(in));
+ if (jmsProperties != null) {
+ for (Map.Entry<String, Object> e : jmsProperties.entrySet()) {
+ text.setObjectProperty(e.getKey(), e.getValue());
+ }
+ }
+ return text;
+ }
+
+ public Object getDestination(MessageExchange exchange) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DefaultProviderMarshaler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.messaging.MessageExchange;
+
+/**
+ * A pluggable strategy used to decide which JMS Destination to use for an outbound JMS message
+ *
+ * @version $Revision$
+ */
+public interface DestinationChooser {
+
+ /**
+ * Chooses which JMS destintation to use for the given message.
+ * The message may be the "in", "out" or "fault" message.
+ *
+ * @param exchange the exchange
+ * @param message the message can be a javax.jbi.messaging.NormalizedMessage,
+ * a javax.jbi.messaging.Fault or an Exception
+ * @return a javax.jms.Destination or String for the destination name
+ */
+ Object chooseDestination(MessageExchange exchange, Object message);
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/DestinationChooser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.management.DeploymentException;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.transaction.TransactionManager;
+
+import org.apache.servicemix.jms.JmsEndpointType;
+import org.springframework.jms.connection.JmsTransactionManager;
+import org.springframework.jms.connection.JmsTransactionManager102;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer102;
+import org.springframework.jms.listener.serversession.ServerSessionFactory;
+import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
+import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.jta.JtaTransactionManager;
+
+/**
+ *
+ * @author gnodet
+ * @org.apache.xbean.XBean element="consumer"
+ * @since 3.2
+ */
+public class JmsConsumerEndpoint extends AbstractConsumerEndpoint implements JmsEndpointType {
+
+ public static final String LISTENER_TYPE_DEFAULT = "default";
+ public static final String LISTENER_TYPE_SIMPLE = "simple";
+ public static final String LISTENER_TYPE_SERVER = "server";
+
+ public static final String TRANSACTED_NONE = "none";
+ public static final String TRANSACTED_XA = "xa";
+ public static final String TRANSACTED_JMS = "jms";
+
+ // type of listener
+ private String listenerType = LISTENER_TYPE_DEFAULT;
+ private boolean jms102 = false;
+ private String transacted = TRANSACTED_NONE;
+
+ // Standard jms properties
+ private String clientId;
+ private Destination destination;
+ private String destinationName;
+ private String durableSubscriptionName;
+ private ExceptionListener exceptionListener;
+ private String messageSelector;
+ private int sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ private boolean subscriptionDurable;
+
+ // simple and default listener properties
+ private boolean pubSubNoLocal = false;
+ private int concurrentConsumers = 1;
+
+ // default listener properties
+ private int cacheLevel = DefaultMessageListenerContainer.CACHE_NONE;
+ private long receiveTimeout = DefaultMessageListenerContainer.DEFAULT_RECEIVE_TIMEOUT;
+ private long recoveryInterval = DefaultMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL;
+
+ // default and server listener properties
+ private int maxMessagesPerTask = Integer.MIN_VALUE;
+
+ // server listener properties
+ private ServerSessionFactory serverSessionFactory;
+
+ private AbstractMessageListenerContainer listenerContainer;
+
+ /**
+ * @return the transacted
+ */
+ public String getTransacted() {
+ return transacted;
+ }
+
+ /**
+ * @param transacted the transacted to set
+ */
+ public void setTransacted(String transacted) {
+ this.transacted = transacted;
+ }
+
+ /**
+ * @return the cacheLevel
+ */
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ /**
+ * @param cacheLevel the cacheLevel to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel(int)
+ */
+ public void setCacheLevel(int cacheLevel) {
+ this.cacheLevel = cacheLevel;
+ }
+
+ /**
+ * @return the clientId
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ /**
+ * @param clientId the clientId to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setClientId(String)
+ */
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * @return the concurrentConsumers
+ */
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ /**
+ * @param concurrentConsumers the concurrentConsumers to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setConcurrentConsumers(int)
+ * @see org.springframework.jms.listener.SimpleMessageListenerContainer#setConcurrentConsumers(int)
+ */
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ /**
+ * @return the destination
+ */
+ public Destination getDestination() {
+ return destination;
+ }
+
+ /**
+ * @param destination the destination to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setDestination(Destination)
+ */
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @return the destinationName
+ */
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ /**
+ * @param destinationName the destinationName to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setDestinationName(String)
+ */
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ /**
+ * @return the durableSubscriptionName
+ */
+ public String getDurableSubscriptionName() {
+ return durableSubscriptionName;
+ }
+
+ /**
+ * @param durableSubscriptionName the durableSubscriptionName to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setDurableSubscriptionName(String)
+ */
+ public void setDurableSubscriptionName(String durableSubscriptionName) {
+ this.durableSubscriptionName = durableSubscriptionName;
+ }
+
+ /**
+ * @return the exceptionListener
+ */
+ public ExceptionListener getExceptionListener() {
+ return exceptionListener;
+ }
+
+ /**
+ * @param exceptionListener the exceptionListener to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setExceptionListener(ExceptionListener)
+ */
+ public void setExceptionListener(ExceptionListener exceptionListener) {
+ this.exceptionListener = exceptionListener;
+ }
+
+ /**
+ * @return the jms102
+ */
+ public boolean isJms102() {
+ return jms102;
+ }
+
+ /**
+ * @param jms102 the jms102 to set
+ */
+ public void setJms102(boolean jms102) {
+ this.jms102 = jms102;
+ }
+
+ /**
+ * @return the listenerType
+ */
+ public String getListenerType() {
+ return listenerType;
+ }
+
+ /**
+ * @param listenerType the listenerType to set
+ */
+ public void setListenerType(String listenerType) {
+ this.listenerType = listenerType;
+ }
+
+ /**
+ * @return the maxMessagesPerTask
+ */
+ public int getMaxMessagesPerTask() {
+ return maxMessagesPerTask;
+ }
+
+ /**
+ * @param maxMessagesPerTask the maxMessagesPerTask to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setMaxMessagesPerTask(int)
+ * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer#setMaxMessagesPerTask(int)
+ */
+ public void setMaxMessagesPerTask(int maxMessagesPerTask) {
+ this.maxMessagesPerTask = maxMessagesPerTask;
+ }
+
+ /**
+ * @return the messageSelector
+ */
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ /**
+ * @param messageSelector the messageSelector to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setMessageSelector(String)
+ */
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ /**
+ * @return the pubSubNoLocal
+ */
+ public boolean isPubSubNoLocal() {
+ return pubSubNoLocal;
+ }
+
+ /**
+ * @param pubSubNoLocal the pubSubNoLocal to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setPubSubNoLocal(boolean)
+ * @see org.springframework.jms.listener.SimpleMessageListenerContainer#setPubSubNoLocal(boolean)
+ */
+ public void setPubSubNoLocal(boolean pubSubNoLocal) {
+ this.pubSubNoLocal = pubSubNoLocal;
+ }
+
+ /**
+ * @return the receiveTimeout
+ */
+ public long getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ /**
+ * @param receiveTimeout the receiveTimeout to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setReceiveTimeout(long)
+ */
+ public void setReceiveTimeout(long receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ }
+
+ /**
+ * @return the recoveryInterval
+ */
+ public long getRecoveryInterval() {
+ return recoveryInterval;
+ }
+
+ /**
+ * @param recoveryInterval the recoveryInterval to set
+ * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setRecoveryInterval(long)
+ */
+ public void setRecoveryInterval(long recoveryInterval) {
+ this.recoveryInterval = recoveryInterval;
+ }
+
+ /**
+ * @return the serverSessionFactory
+ */
+ public ServerSessionFactory getServerSessionFactory() {
+ return serverSessionFactory;
+ }
+
+ /**
+ * @param serverSessionFactory the serverSessionFactory to set
+ * @see org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer#setServerSessionFactory(ServerSessionFactory)
+ */
+ public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
+ this.serverSessionFactory = serverSessionFactory;
+ }
+
+ /**
+ * @return the sessionAcknowledgeMode
+ */
+ public int getSessionAcknowledgeMode() {
+ return sessionAcknowledgeMode;
+ }
+
+ /**
+ * @param sessionAcknowledgeMode the sessionAcknowledgeMode to set
+ * @see org.springframework.jms.support.JmsAccessor#setSessionAcknowledgeMode(int)
+ */
+ public void setSessionAcknowledgeMode(int sessionAcknowledgeMode) {
+ this.sessionAcknowledgeMode = sessionAcknowledgeMode;
+ }
+
+ /**
+ * @return the subscriptionDurable
+ */
+ public boolean isSubscriptionDurable() {
+ return subscriptionDurable;
+ }
+
+ /**
+ * @param subscriptionDurable the subscriptionDurable to set
+ * @see org.springframework.jms.listener.AbstractMessageListenerContainer#setSubscriptionDurable(boolean)
+ */
+ public void setSubscriptionDurable(boolean subscriptionDurable) {
+ this.subscriptionDurable = subscriptionDurable;
+ }
+
+ public String getLocationURI() {
+ // TODO: Need to return a real URI
+ return getService() + "#" + getEndpoint();
+ }
+
+ public synchronized void start() throws Exception {
+ listenerContainer = createListenerContainer();
+ listenerContainer.setMessageListener(new SessionAwareMessageListener() {
+ public void onMessage(Message message, Session session) throws JMSException {
+ JmsConsumerEndpoint.this.onMessage(message, session);
+ }
+ });
+ listenerContainer.setAutoStartup(true);
+ listenerContainer.afterPropertiesSet();
+ super.start();
+ }
+
+ public synchronized void stop() throws Exception {
+ if (listenerContainer != null) {
+ listenerContainer.stop();
+ listenerContainer = null;
+ }
+ super.stop();
+ }
+
+ public void validate() throws DeploymentException {
+ // TODO: check service, endpoint
+ super.validate();
+ if (getConnectionFactory() == null) {
+ throw new DeploymentException("connectionFactory is required");
+ }
+ if (destination == null && destinationName == null) {
+ throw new DeploymentException("destination or destinationName is required");
+ }
+ if (!LISTENER_TYPE_DEFAULT.equals(listenerType) &&
+ !LISTENER_TYPE_SIMPLE.equals(listenerType) &&
+ !LISTENER_TYPE_SERVER.equals(listenerType)) {
+ throw new DeploymentException("listenerType must be default, simple or server");
+ }
+ if (TRANSACTED_XA.equals(transacted) &&
+ !LISTENER_TYPE_DEFAULT.equals(listenerType)) {
+ throw new DeploymentException("XA transactions are only supported on default listener");
+ }
+ if (!TRANSACTED_NONE.equals(transacted) &&
+ !TRANSACTED_JMS.equals(transacted) &&
+ !TRANSACTED_XA.equals(transacted)) {
+ throw new DeploymentException("transacted must be none, jms or xa");
+ }
+ }
+
+ protected AbstractMessageListenerContainer createListenerContainer() {
+ final AbstractMessageListenerContainer container;
+ if (LISTENER_TYPE_DEFAULT.equals(listenerType)) {
+ final DefaultMessageListenerContainer cont;
+ if (jms102) {
+ cont = new DefaultMessageListenerContainer102();
+ } else {
+ cont = new DefaultMessageListenerContainer();
+ }
+ cont.setCacheLevel(cacheLevel);
+ cont.setConcurrentConsumers(concurrentConsumers);
+ cont.setMaxMessagesPerTask(maxMessagesPerTask);
+ cont.setPubSubNoLocal(pubSubNoLocal);
+ cont.setReceiveTimeout(receiveTimeout);
+ cont.setRecoveryInterval(recoveryInterval);
+ if (TRANSACTED_XA.equals(transacted)) {
+ TransactionManager tm = (TransactionManager) getContext().getTransactionManager();
+ if (tm == null) {
+ throw new IllegalStateException("No TransactionManager available");
+ } else if (tm instanceof PlatformTransactionManager) {
+ cont.setTransactionManager((PlatformTransactionManager) tm);
+ } else {
+ cont.setTransactionManager(new JtaTransactionManager(tm));
+ }
+ } else if (TRANSACTED_JMS.equals(transacted)) {
+ if (jms102) {
+ cont.setTransactionManager(new JmsTransactionManager102(getConnectionFactory(), isPubSubDomain()));
+ } else {
+ cont.setTransactionManager(new JmsTransactionManager(getConnectionFactory()));
+ }
+ }
+ container = cont;
+ } else if (LISTENER_TYPE_SIMPLE.equals(listenerType)) {
+ final SimpleMessageListenerContainer cont;
+ if (jms102) {
+ cont = new SimpleMessageListenerContainer102();
+ } else {
+ cont = new SimpleMessageListenerContainer();
+ }
+ cont.setConcurrentConsumers(concurrentConsumers);
+ cont.setPubSubNoLocal(pubSubNoLocal);
+ cont.setTaskExecutor(null); // TODO: value ?
+ if (TRANSACTED_JMS.equals(transacted)) {
+ cont.setSessionTransacted(true);
+ }
+ container = cont;
+ } else if (LISTENER_TYPE_SERVER.equals(listenerType)) {
+ final ServerSessionMessageListenerContainer cont;
+ if (jms102) {
+ cont = new ServerSessionMessageListenerContainer102();
+ } else {
+ cont = new ServerSessionMessageListenerContainer();
+ }
+ cont.setMaxMessagesPerTask(maxMessagesPerTask > 0 ? maxMessagesPerTask : 1);
+ cont.setServerSessionFactory(serverSessionFactory);
+ if (TRANSACTED_JMS.equals(transacted)) {
+ cont.setSessionTransacted(true);
+ }
+ container = cont;
+ } else {
+ throw new IllegalStateException();
+ }
+ container.setAutoStartup(false);
+ container.setClientId(clientId);
+ container.setConnectionFactory(getConnectionFactory());
+ if (destination != null) {
+ container.setDestination(destination);
+ } else if (destinationName != null) {
+ container.setDestinationName(destinationName);
+ }
+ if (getDestinationResolver() != null) {
+ container.setDestinationResolver(getDestinationResolver());
+ }
+ if (subscriptionDurable) {
+ if (durableSubscriptionName == null) {
+ // Use unique name generated from this endpoint
+ durableSubscriptionName = getService() + "#" + getEndpoint();
+ }
+ container.setDurableSubscriptionName(durableSubscriptionName);
+ }
+ container.setExceptionListener(exceptionListener);
+ container.setMessageSelector(messageSelector);
+ container.setPubSubDomain(isPubSubDomain());
+ container.setSessionAcknowledgeMode(sessionAcknowledgeMode);
+ container.setSubscriptionDurable(subscriptionDurable);
+ return container;
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerEndpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+
+public interface JmsConsumerMarshaler {
+
+ public interface JmsContext {
+ public Message getMessage();
+ }
+
+ JmsContext createContext(Message message, ComponentContext context) throws Exception;
+
+ MessageExchange createExchange(JmsContext context) throws Exception;
+
+ Message createOut(MessageExchange exchange,
+ NormalizedMessage outMsg,
+ Session session,
+ JmsContext context) throws Exception;
+
+ Message createFault(MessageExchange exchange,
+ Fault fault,
+ Session session,
+ JmsContext context) throws Exception;
+
+ Message createError(MessageExchange exchange,
+ Exception error,
+ Session session,
+ JmsContext context) throws Exception;
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsConsumerMarshaler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import java.util.Timer;
+
+import javax.jbi.management.DeploymentException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
+
+import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.executors.WorkManagerWrapper;
+import org.apache.servicemix.jms.JmsEndpointType;
+import org.jencks.SingletonEndpointFactory;
+import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
+
+/**
+ *
+ * @author gnodet
+ * @org.apache.xbean.XBean element="jca-consumer"
+ */
+public class JmsJcaConsumerEndpoint extends AbstractConsumerEndpoint implements JmsEndpointType {
+
+ private ResourceAdapter resourceAdapter;
+ private ActivationSpec activationSpec;
+ private BootstrapContext bootstrapContext;
+ private MessageEndpointFactory endpointFactory;
+
+ /**
+ * @return the bootstrapContext
+ */
+ public BootstrapContext getBootstrapContext() {
+ return bootstrapContext;
+ }
+
+ /**
+ * @param bootstrapContext the bootstrapContext to set
+ */
+ public void setBootstrapContext(BootstrapContext bootstrapContext) {
+ this.bootstrapContext = bootstrapContext;
+ }
+
+ /**
+ * @return the activationSpec
+ */
+ public ActivationSpec getActivationSpec() {
+ return activationSpec;
+ }
+
+ /**
+ * @param activationSpec the activationSpec to set
+ */
+ public void setActivationSpec(ActivationSpec activationSpec) {
+ this.activationSpec = activationSpec;
+ }
+
+ /**
+ * @return the resourceAdapter
+ */
+ public ResourceAdapter getResourceAdapter() {
+ return resourceAdapter;
+ }
+
+ /**
+ * @param resourceAdapter the resourceAdapter to set
+ */
+ public void setResourceAdapter(ResourceAdapter resourceAdapter) {
+ this.resourceAdapter = resourceAdapter;
+ }
+
+ public String getLocationURI() {
+ // TODO: Need to return a real URI
+ return getService() + "#" + getEndpoint();
+ }
+
+ public synchronized void start() throws Exception {
+ if (bootstrapContext == null) {
+ Executor executor = getServiceUnit().getComponent().getExecutor();
+ WorkManager wm = new WorkManagerWrapper(executor);
+ bootstrapContext = new SimpleBootstrapContext(wm);
+ }
+ resourceAdapter.start(bootstrapContext);
+ activationSpec.setResourceAdapter(resourceAdapter);
+ if (endpointFactory == null) {
+ TransactionManager tm = (TransactionManager) getContext().getTransactionManager();
+ endpointFactory = new SingletonEndpointFactory(new MessageListener() {
+ public void onMessage(Message message) {
+ try {
+ JmsJcaConsumerEndpoint.this.onMessage(message, null);
+ } catch (JMSException e) {
+ throw new ListenerExecutionFailedException("Unable to handle message", e);
+ }
+ }
+ }, tm);
+ }
+ resourceAdapter.endpointActivation(endpointFactory, activationSpec);
+ super.start();
+ }
+
+ public synchronized void stop() throws Exception {
+ resourceAdapter.endpointDeactivation(endpointFactory, activationSpec);
+ resourceAdapter.stop();
+ super.stop();
+ }
+
+ public void validate() throws DeploymentException {
+ super.validate();
+ if (resourceAdapter == null) {
+ throw new DeploymentException("resourceAdapter must be set");
+ }
+ if (activationSpec == null) {
+ throw new DeploymentException("activationSpec must be set");
+ }
+ }
+
+ protected static class SimpleBootstrapContext implements BootstrapContext {
+ private final WorkManager workManager;
+ public SimpleBootstrapContext(WorkManager workManager) {
+ this.workManager = workManager;
+ }
+ public Timer createTimer() throws UnavailableException {
+ throw new UnsupportedOperationException();
+ }
+ public WorkManager getWorkManager() {
+ return workManager;
+ }
+ public XATerminator getXATerminator() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsJcaConsumerEndpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.jms.JmsEndpointType;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.support.destination.DestinationResolver;
+
+/**
+ *
+ * @author gnodet
+ * @org.apache.xbean.XBean element="provider"
+ * @since 3.2
+ */
+public class JmsProviderEndpoint extends ProviderEndpoint implements JmsEndpointType {
+
+ private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
+ private DestinationChooser destinationChooser = new SimpleDestinationChooser();
+ private JmsTemplate template;
+
+ private boolean jms102 = false;
+ private ConnectionFactory connectionFactory;
+ private boolean pubSubDomain = false;
+ private DestinationResolver destinationResolver;
+ private Destination destination;
+ private String destinationName;
+ private boolean messageIdEnabled = true;
+ private boolean messageTimestampEnabled = true;
+ private boolean pubSubNoLocal = false;
+ private long receiveTimeout = JmsTemplate.DEFAULT_RECEIVE_TIMEOUT;
+ private boolean explicitQosEnabled = false;
+ private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private int priority = Message.DEFAULT_PRIORITY;
+ private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+
+ /**
+ * @return the destination
+ */
+ public Destination getDestination() {
+ return destination;
+ }
+
+ /**
+ * @param destination the destination to set
+ */
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @return the destinationName
+ */
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ /**
+ * @param destinationName the destinationName to set
+ */
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ /**
+ * @return the jms102
+ */
+ public boolean isJms102() {
+ return jms102;
+ }
+
+ /**
+ * @param jms102 the jms102 to set
+ */
+ public void setJms102(boolean jms102) {
+ this.jms102 = jms102;
+ }
+
+ /**
+ * @return the connectionFactory
+ */
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ /**
+ * @param connectionFactory the connectionFactory to set
+ */
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * @return the deliveryMode
+ */
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ /**
+ * @param deliveryMode the deliveryMode to set
+ */
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ /**
+ * @return the destinationChooser
+ */
+ public DestinationChooser getDestinationChooser() {
+ return destinationChooser;
+ }
+
+ /**
+ * @param destinationChooser the destinationChooser to set
+ */
+ public void setDestinationChooser(DestinationChooser destinationChooser) {
+ if (destinationChooser == null) {
+ throw new NullPointerException("destinationChooser is null");
+ }
+ this.destinationChooser = destinationChooser;
+ }
+
+ /**
+ * @return the destinationResolver
+ */
+ public DestinationResolver getDestinationResolver() {
+ return destinationResolver;
+ }
+
+ /**
+ * @param destinationResolver the destinationResolver to set
+ */
+ public void setDestinationResolver(DestinationResolver destinationResolver) {
+ this.destinationResolver = destinationResolver;
+ }
+
+ /**
+ * @return the explicitQosEnabled
+ */
+ public boolean isExplicitQosEnabled() {
+ return explicitQosEnabled;
+ }
+
+ /**
+ * @param explicitQosEnabled the explicitQosEnabled to set
+ */
+ public void setExplicitQosEnabled(boolean explicitQosEnabled) {
+ this.explicitQosEnabled = explicitQosEnabled;
+ }
+
+ /**
+ * @return the marshaler
+ */
+ public JmsProviderMarshaler getMarshaler() {
+ return marshaler;
+ }
+
+ /**
+ * @param marshaler the marshaler to set
+ */
+ public void setMarshaler(JmsProviderMarshaler marshaler) {
+ if (marshaler == null) {
+ throw new NullPointerException("marshaler is null");
+ }
+ this.marshaler = marshaler;
+ }
+
+ /**
+ * @return the messageIdEnabled
+ */
+ public boolean isMessageIdEnabled() {
+ return messageIdEnabled;
+ }
+
+ /**
+ * @param messageIdEnabled the messageIdEnabled to set
+ */
+ public void setMessageIdEnabled(boolean messageIdEnabled) {
+ this.messageIdEnabled = messageIdEnabled;
+ }
+
+ /**
+ * @return the messageTimestampEnabled
+ */
+ public boolean isMessageTimestampEnabled() {
+ return messageTimestampEnabled;
+ }
+
+ /**
+ * @param messageTimestampEnabled the messageTimestampEnabled to set
+ */
+ public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
+ this.messageTimestampEnabled = messageTimestampEnabled;
+ }
+
+ /**
+ * @return the priority
+ */
+ public int getPriority() {
+ return priority;
+ }
+
+ /**
+ * @param priority the priority to set
+ */
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ /**
+ * @return the pubSubDomain
+ */
+ public boolean isPubSubDomain() {
+ return pubSubDomain;
+ }
+
+ /**
+ * @param pubSubDomain the pubSubDomain to set
+ */
+ public void setPubSubDomain(boolean pubSubDomain) {
+ this.pubSubDomain = pubSubDomain;
+ }
+
+ /**
+ * @return the pubSubNoLocal
+ */
+ public boolean isPubSubNoLocal() {
+ return pubSubNoLocal;
+ }
+
+ /**
+ * @param pubSubNoLocal the pubSubNoLocal to set
+ */
+ public void setPubSubNoLocal(boolean pubSubNoLocal) {
+ this.pubSubNoLocal = pubSubNoLocal;
+ }
+
+ /**
+ * @return the receiveTimeout
+ */
+ public long getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ /**
+ * @param receiveTimeout the receiveTimeout to set
+ */
+ public void setReceiveTimeout(long receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ }
+
+ /**
+ * @return the timeToLive
+ */
+ public long getTimeToLive() {
+ return timeToLive;
+ }
+
+ /**
+ * @param timeToLive the timeToLive to set
+ */
+ public void setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ protected void processInOnly(final MessageExchange exchange, final NormalizedMessage in) throws Exception {
+ MessageCreator creator = new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ try {
+ Message message = marshaler.createMessage(exchange, in, session);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending message to: " + template.getDefaultDestinationName() + " message: " + message);
+ }
+ return message;
+ }
+ catch (Exception e) {
+ JMSException jmsEx = new JMSException("Failed to create JMS Message: " + e);
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+ }
+ };
+ Object dest = destinationChooser.chooseDestination(exchange, in);
+ if (dest instanceof Destination) {
+ template.send((Destination) dest, creator);
+ } else if (dest instanceof String) {
+ template.send((String) dest, creator);
+ } else {
+ template.send(creator);
+ }
+ }
+
+ protected void processInOut(MessageExchange exchange, NormalizedMessage in) throws Exception {
+
+ }
+
+ public synchronized void start() throws Exception {
+ template = createTemplate();
+ super.start();
+ }
+
+ public synchronized void stop() throws Exception {
+ super.stop();
+ }
+
+ public void validate() throws DeploymentException {
+ // TODO: check service, endpoint
+ super.validate();
+ if (getService() == null) {
+ throw new DeploymentException("service must be set");
+ }
+ if (getEndpoint() == null) {
+ throw new DeploymentException("endpoint must be set");
+ }
+ if (getConnectionFactory() == null) {
+ throw new DeploymentException("connectionFactory is required");
+ }
+ }
+
+ protected JmsTemplate createTemplate() {
+ JmsTemplate template;
+ if (isJms102()) {
+ template = new JmsTemplate102();
+ } else {
+ template = new JmsTemplate();
+ }
+ template.setConnectionFactory(getConnectionFactory());
+ if (getDestination() != null) {
+ template.setDefaultDestination(getDestination());
+ } else if (getDestinationName() != null) {
+ template.setDefaultDestinationName(getDestinationName());
+ }
+ template.setDeliveryMode(getDeliveryMode());
+ if (getDestinationResolver() != null) {
+ template.setDestinationResolver(getDestinationResolver());
+ }
+ template.setExplicitQosEnabled(isExplicitQosEnabled());
+ template.setMessageIdEnabled(isMessageIdEnabled());
+ template.setMessageTimestampEnabled(isMessageTimestampEnabled());
+ template.setPriority(getPriority());
+ template.setPubSubDomain(isPubSubDomain());
+ template.setPubSubNoLocal(isPubSubNoLocal());
+ template.setTimeToLive(getTimeToLive());
+ template.setReceiveTimeout(getReceiveTimeout());
+ return template;
+ }
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderEndpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+
+public interface JmsProviderMarshaler {
+
+ Object getDestination(MessageExchange exchange);
+
+ Message createMessage(MessageExchange exchange, NormalizedMessage in, Session session) throws Exception;
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/JmsProviderMarshaler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java?view=auto&rev=507994
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java (added)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java Thu Feb 15 08:37:20 2007
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms.endpoint;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Destination;
+
+/**
+ * A simple destination chooser which will use the value of the {@link #DESTINATION_KEY}
+ * property on the message exchange, or fall back to a default destination
+ *
+ * @version $Revision$
+ */
+public class SimpleDestinationChooser implements DestinationChooser {
+
+ public static final String DESTINATION_KEY = "org.apache.servicemix.jms.destination";
+
+ private Destination defaultDestination;
+ private String defaultDestinationName;
+
+ public SimpleDestinationChooser() {
+ }
+
+ public SimpleDestinationChooser(Destination defaultDestination) {
+ this.defaultDestination = defaultDestination;
+ }
+
+ public SimpleDestinationChooser(String defaultDestinationName) {
+ this.defaultDestinationName = defaultDestinationName;
+ }
+
+ public Object chooseDestination(MessageExchange exchange, Object message) {
+ Object property = null;
+ if (message instanceof NormalizedMessage) {
+ property = ((NormalizedMessage) message).getProperty(DESTINATION_KEY);
+ }
+ if (property instanceof Destination) {
+ return (Destination) property;
+ }
+ if (getDefaultDestination() != null) {
+ return getDefaultDestination();
+ }
+ return getDefaultDestinationName();
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public Destination getDefaultDestination() {
+ return defaultDestination;
+ }
+
+ public void setDefaultDestination(Destination defaultDestination) {
+ this.defaultDestination = defaultDestination;
+ }
+
+ public String getDefaultDestinationName() {
+ return defaultDestinationName;
+ }
+
+ public void setDefaultDestinationName(String defaultDestinationName) {
+ this.defaultDestinationName = defaultDestinationName;
+ }
+
+}
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoint/SimpleDestinationChooser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain