You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/08 14:23:52 UTC
svn commit: r563827 - in /incubator/servicemix/trunk/deployables:
bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/
bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/
bindingcomponents/servicemix-jms/s...
Author: gnodet
Date: Wed Aug 8 05:23:51 2007
New Revision: 563827
URL: http://svn.apache.org/viewvc?view=rev&rev=563827
Log:
SM-1023: correlationId and senderEndpoint properties are not set on jsr181 (using the proxy) and jms consumer endpoints
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java Wed Aug 8 05:23:51 2007
@@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Properties;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
@@ -41,6 +43,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.common.BaseLifeCycle;
+import org.apache.servicemix.common.EndpointComponentContext;
import org.apache.servicemix.common.ExchangeProcessor;
import org.apache.servicemix.soap.Context;
import org.apache.servicemix.soap.SoapFault;
@@ -60,10 +63,14 @@
protected JmsEndpoint endpoint;
protected Connection connection;
protected SoapHelper soapHelper;
+ protected ComponentContext context;
+ protected DeliveryChannel channel;
public AbstractJmsProcessor(JmsEndpoint endpoint) throws Exception {
this.endpoint = endpoint;
this.soapHelper = new SoapHelper(endpoint);
+ this.context = new EndpointComponentContext(endpoint);
+ this.channel = context.getDeliveryChannel();
}
public void start() throws Exception {
@@ -166,7 +173,7 @@
return soapHelper.createContext();
}
- protected MessageExchange toNMS(Message message, Context context) throws Exception {
+ protected MessageExchange toNMS(Message message, Context ctx) throws Exception {
InputStream is = null;
if (message instanceof TextMessage) {
is = new ByteArrayInputStream(((TextMessage) message).getText().getBytes());
@@ -180,15 +187,15 @@
}
String contentType = message.getStringProperty(CONTENT_TYPE);
SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
- context.setInMessage(soap);
- context.setProperty(Message.class.getName(), message);
- MessageExchange exchange = soapHelper.onReceive(context);
+ ctx.setInMessage(soap);
+ ctx.setProperty(Message.class.getName(), message);
+ MessageExchange exchange = soapHelper.onReceive(ctx);
// TODO: copy protocol messages
//inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
return exchange;
}
- protected Message fromNMSResponse(MessageExchange exchange, Context context, Session session) throws Exception {
+ protected Message fromNMSResponse(MessageExchange exchange, Context ctx, Session session) throws Exception {
Message response = null;
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
@@ -200,14 +207,14 @@
Fault jbiFault = exchange.getFault();
if (jbiFault != null) {
SoapFault fault = new SoapFault(SoapFault.RECEIVER, null, null, null, jbiFault.getContent());
- SoapMessage soapFault = soapHelper.onFault(context, fault);
+ SoapMessage soapFault = soapHelper.onFault(ctx, fault);
TextMessage txt = session.createTextMessage();
fromNMS(soapFault, txt, (Map) jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS));
response = txt;
} else {
NormalizedMessage outMsg = exchange.getMessage("out");
if (outMsg != null) {
- SoapMessage out = soapHelper.onReply(context, outMsg);
+ SoapMessage out = soapHelper.onReply(ctx, outMsg);
TextMessage txt = session.createTextMessage();
fromNMS(out, txt, (Map) outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS));
response = txt;
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java Wed Aug 8 05:23:51 2007
@@ -18,7 +18,6 @@
import java.util.Map;
-import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
@@ -38,7 +37,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.common.AsyncBaseLifeCycle;
import org.apache.servicemix.common.BaseLifeCycle;
import org.apache.servicemix.jms.AbstractJmsProcessor;
import org.apache.servicemix.jms.JmsEndpoint;
@@ -54,7 +52,6 @@
private static final Log LOG = LogFactory.getLog(JcaConsumerProcessor.class);
protected Map pendingMessages = new ConcurrentHashMap();
- protected DeliveryChannel channel;
protected ResourceAdapter resourceAdapter;
protected MessageEndpointFactory endpointFactory;
protected ActivationSpec activationSpec;
@@ -67,9 +64,7 @@
}
public void start() throws Exception {
- AsyncBaseLifeCycle lf = (AsyncBaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
- channel = lf.getContext().getDeliveryChannel();
- transactionManager = (TransactionManager) lf.getContext().getTransactionManager();
+ transactionManager = (TransactionManager) context.getTransactionManager();
endpointFactory = new SingletonEndpointFactory(this, transactionManager);
bootstrapContext = endpoint.getBootstrapContext();
if (bootstrapContext == null) {
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java Wed Aug 8 05:23:51 2007
@@ -18,7 +18,6 @@
import java.util.Map;
-import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
@@ -32,7 +31,6 @@
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import org.apache.servicemix.common.BaseLifeCycle;
import org.apache.servicemix.jms.AbstractJmsProcessor;
import org.apache.servicemix.jms.JmsEndpoint;
import org.apache.servicemix.soap.Context;
@@ -43,7 +41,6 @@
protected Destination destination;
protected MessageConsumer consumer;
protected Map pendingMessages;
- protected DeliveryChannel channel;
public MultiplexingConsumerProcessor(JmsEndpoint endpoint) throws Exception {
super(endpoint);
@@ -66,7 +63,6 @@
}
}
pendingMessages = new ConcurrentHashMap();
- channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
@@ -94,8 +90,7 @@
// TODO: copy protocol messages
//inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
pendingMessages.put(exchange.getExchangeId(), context);
- BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
- lf.sendConsumerExchange(exchange, MultiplexingConsumerProcessor.this.endpoint);
+ channel.send(exchange);
} catch (Throwable e) {
log.error("Error while handling jms message", e);
}
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java Wed Aug 8 05:23:51 2007
@@ -16,7 +16,6 @@
*/
package org.apache.servicemix.jms.standard;
-import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jms.Destination;
@@ -36,7 +35,6 @@
protected Session session;
protected Destination destination;
- protected DeliveryChannel channel;
protected AtomicBoolean running = new AtomicBoolean(false);
public StandardConsumerProcessor(JmsEndpoint endpoint) throws Exception {
@@ -52,7 +50,6 @@
throw new IllegalStateException("No destination provided");
}
}
- channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
synchronized (running) {
endpoint.getServiceUnit().getComponent().getExecutor().execute(new Runnable() {
public void run() {
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java Wed Aug 8 05:23:51 2007
@@ -19,11 +19,15 @@
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
+import org.apache.servicemix.common.Endpoint;
+import org.apache.servicemix.common.ServiceMixComponent;
+
/**
* This class is a wrapper around an existing DeliveryChannel
* that will be given to service engine endpoints so that
@@ -34,6 +38,7 @@
*/
public class EndpointDeliveryChannel implements DeliveryChannel {
+ private static ThreadLocal<Endpoint> endpoint = new ThreadLocal<Endpoint>();
private final DeliveryChannel channel;
public EndpointDeliveryChannel(DeliveryChannel channel) {
@@ -60,8 +65,8 @@
return channel.createExchangeFactory(interfaceName);
}
- public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
- return channel.createExchangeFactory(endpoint);
+ public MessageExchangeFactory createExchangeFactory(ServiceEndpoint ep) {
+ return channel.createExchangeFactory(ep);
}
public MessageExchangeFactory createExchangeFactoryForService(QName serviceName) {
@@ -72,14 +77,30 @@
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
throw new UnsupportedOperationException("Asynchronous send of active exchanges are not supported");
}
+ prepare(exchange);
channel.send(exchange);
}
public boolean sendSync(MessageExchange exchange, long timeout) throws MessagingException {
+ prepare(exchange);
return channel.sendSync(exchange, timeout);
}
public boolean sendSync(MessageExchange exchange) throws MessagingException {
+ prepare(exchange);
return channel.sendSync(exchange);
}
+
+ protected void prepare(MessageExchange exchange) throws MessagingException {
+ Endpoint ep = this.endpoint.get();
+ if (ep != null && exchange.getStatus() == ExchangeStatus.ACTIVE && exchange.getRole() == Role.CONSUMER) {
+ ServiceMixComponent comp = ep.getServiceUnit().getComponent();
+ comp.prepareConsumerExchange(exchange, ep);
+ }
+ }
+
+ public static void setEndpoint(Endpoint ep) {
+ endpoint.set(ep);
+ }
+
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java?view=diff&rev=563827&r1=563826&r2=563827
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java Wed Aug 8 05:23:51 2007
@@ -105,6 +105,7 @@
}
msg.setAttachments(attachments);
}
+ EndpointDeliveryChannel.setEndpoint(endpoint);
JBIContext.setMessageExchange(exchange);
if (isInAndOut(exchange)) {
// TODO ?
@@ -113,6 +114,7 @@
c.receive(ctx, msg);
} finally {
JBIContext.setMessageExchange(null);
+ EndpointDeliveryChannel.setEndpoint(null);
}
c.close();