You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/08 14:29:26 UTC

svn commit: r563831 - in /incubator/servicemix/branches/servicemix-3.1/deployables: bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/ bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/ bindingcomponent...

Author: gnodet
Date: Wed Aug  8 05:29:25 2007
New Revision: 563831

URL: http://svn.apache.org/viewvc?view=rev&rev=563831
Log:
SM-1023: correlationId and senderEndpoint properties are not set on jsr181 (using the proxy) and jms consumer endpoints

Modified:
    incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
    incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
    incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
    incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
    incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java
    incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java Wed Aug  8 05:29:25 2007
@@ -24,6 +24,8 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
@@ -41,6 +43,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.common.BaseLifeCycle;
+import org.apache.servicemix.common.EndpointComponentContext;
 import org.apache.servicemix.common.ExchangeProcessor;
 import org.apache.servicemix.soap.Context;
 import org.apache.servicemix.soap.SoapFault;
@@ -60,10 +63,14 @@
     protected JmsEndpoint endpoint;
     protected Connection connection;
     protected SoapHelper soapHelper;
+    protected ComponentContext context;
+    protected DeliveryChannel channel;
 
     public AbstractJmsProcessor(JmsEndpoint endpoint) throws Exception {
         this.endpoint = endpoint;
         this.soapHelper = new SoapHelper(endpoint);
+        this.context = new EndpointComponentContext(endpoint);
+        this.channel = context.getDeliveryChannel();
     }
 
     public void start() throws Exception {
@@ -166,7 +173,7 @@
         return soapHelper.createContext();
     }
     
-    protected MessageExchange toNMS(Message message, Context context) throws Exception {
+    protected MessageExchange toNMS(Message message, Context ctx) throws Exception {
         InputStream is = null;
         if (message instanceof TextMessage) {
             is = new ByteArrayInputStream(((TextMessage) message).getText().getBytes());
@@ -180,15 +187,15 @@
         }
         String contentType = message.getStringProperty(CONTENT_TYPE);
         SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
-        context.setInMessage(soap);
-        context.setProperty(Message.class.getName(), message);
-        MessageExchange exchange = soapHelper.onReceive(context);
+        ctx.setInMessage(soap);
+        ctx.setProperty(Message.class.getName(), message);
+        MessageExchange exchange = soapHelper.onReceive(ctx);
         // TODO: copy protocol messages
         //inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
         return exchange;
     }
     
-    protected Message fromNMSResponse(MessageExchange exchange, Context context, Session session) throws Exception {
+    protected Message fromNMSResponse(MessageExchange exchange, Context ctx, Session session) throws Exception {
         Message response = null;
         if (exchange.getStatus() == ExchangeStatus.ERROR) {
             Exception e = exchange.getError();
@@ -200,14 +207,14 @@
             Fault jbiFault = exchange.getFault(); 
             if (jbiFault != null) {
                 SoapFault fault = new SoapFault(SoapFault.RECEIVER, null, null, null, jbiFault.getContent());
-                SoapMessage soapFault = soapHelper.onFault(context, fault);
+                SoapMessage soapFault = soapHelper.onFault(ctx, fault);
                 TextMessage txt = session.createTextMessage();
                 fromNMS(soapFault, txt, (Map) jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS));
                 response = txt;
             } else {
                 NormalizedMessage outMsg = exchange.getMessage("out");
                 if (outMsg != null) {
-                    SoapMessage out = soapHelper.onReply(context, outMsg);
+                    SoapMessage out = soapHelper.onReply(ctx, outMsg);
                     TextMessage txt = session.createTextMessage();
                     fromNMS(out, txt, (Map) outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS));
                     response = txt;

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaConsumerProcessor.java Wed Aug  8 05:29:25 2007
@@ -18,7 +18,6 @@
 
 import java.util.Map;
 
-import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
@@ -36,7 +35,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.common.AsyncBaseLifeCycle;
 import org.apache.servicemix.common.BaseLifeCycle;
 import org.apache.servicemix.jms.AbstractJmsProcessor;
 import org.apache.servicemix.jms.JmsEndpoint;
@@ -54,7 +52,6 @@
     private static final Log log = LogFactory.getLog(JcaConsumerProcessor.class);
 
     protected Map pendingMessages = new ConcurrentHashMap();
-    protected DeliveryChannel channel;
     protected ResourceAdapter resourceAdapter;
     protected MessageEndpointFactory endpointFactory;
     protected ActivationSpec activationSpec;
@@ -67,9 +64,7 @@
     }
 
     public void start() throws Exception {
-        AsyncBaseLifeCycle lf = (AsyncBaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
-        channel = lf.getContext().getDeliveryChannel();
-        transactionManager = (TransactionManager) lf.getContext().getTransactionManager();
+        transactionManager = (TransactionManager) context.getTransactionManager();
         endpointFactory = new SingletonEndpointFactory(this, transactionManager);
         bootstrapContext = endpoint.getBootstrapContext();
         if (bootstrapContext == null) {

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java Wed Aug  8 05:29:25 2007
@@ -18,7 +18,6 @@
 
 import java.util.Map;
 
-import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
@@ -30,7 +29,6 @@
 import javax.jms.Session;
 import javax.naming.InitialContext;
 
-import org.apache.servicemix.common.BaseLifeCycle;
 import org.apache.servicemix.jms.AbstractJmsProcessor;
 import org.apache.servicemix.jms.JmsEndpoint;
 import org.apache.servicemix.soap.Context;
@@ -43,7 +41,6 @@
     protected Destination destination;
     protected MessageConsumer consumer;
     protected Map pendingMessages;
-    protected DeliveryChannel channel;
 
     public MultiplexingConsumerProcessor(JmsEndpoint endpoint) throws Exception {
         super(endpoint);
@@ -66,7 +63,6 @@
             }
         }
         pendingMessages = new ConcurrentHashMap();
-        channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
         consumer = session.createConsumer(destination);
         consumer.setMessageListener(this);
     }
@@ -94,8 +90,7 @@
                     // TODO: copy protocol messages
                     //inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(message));
                     pendingMessages.put(exchange.getExchangeId(), context);
-                    BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
-                    lf.sendConsumerExchange(exchange, MultiplexingConsumerProcessor.this.endpoint);
+                    channel.send(exchange);
                 } catch (Throwable e) {
                     log.error("Error while handling jms message", e);
                 }

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java Wed Aug  8 05:29:25 2007
@@ -16,7 +16,6 @@
  */
 package org.apache.servicemix.jms.standard;
 
-import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jms.Destination;
@@ -36,7 +35,6 @@
 
     protected Session session;
     protected Destination destination;
-    protected DeliveryChannel channel;
     protected AtomicBoolean running = new AtomicBoolean(false);
 
     public StandardConsumerProcessor(JmsEndpoint endpoint) throws Exception {
@@ -52,7 +50,6 @@
                 throw new IllegalStateException("No destination provided");
             }
         }
-        channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
         synchronized (running) {
             endpoint.getServiceUnit().getComponent().getExecutor().execute(new Runnable() {
                 public void run() {

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/EndpointDeliveryChannel.java Wed Aug  8 05:29:25 2007
@@ -19,11 +19,15 @@
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
 
+import org.apache.servicemix.common.Endpoint;
+import org.apache.servicemix.common.ServiceMixComponent;
+
 /**
  * This class is a wrapper around an existing DeliveryChannel
  * that will be given to service engine endpoints so that
@@ -34,6 +38,7 @@
  */
 public class EndpointDeliveryChannel implements DeliveryChannel {
 
+    private static ThreadLocal<Endpoint> endpoint = new ThreadLocal<Endpoint>();
     private final DeliveryChannel channel;
     
     public EndpointDeliveryChannel(DeliveryChannel channel) {
@@ -60,8 +65,8 @@
         return channel.createExchangeFactory(interfaceName);
     }
 
-    public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
-        return channel.createExchangeFactory(endpoint);
+    public MessageExchangeFactory createExchangeFactory(ServiceEndpoint ep) {
+        return channel.createExchangeFactory(ep);
     }
 
     public MessageExchangeFactory createExchangeFactoryForService(QName serviceName) {
@@ -72,14 +77,30 @@
         if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
             throw new UnsupportedOperationException("Asynchronous send of active exchanges are not supported");
         }
+        prepare(exchange);
         channel.send(exchange);
     }
 
     public boolean sendSync(MessageExchange exchange, long timeout) throws MessagingException {
+        prepare(exchange);
         return channel.sendSync(exchange, timeout);
     }
 
     public boolean sendSync(MessageExchange exchange) throws MessagingException {
+        prepare(exchange);
         return channel.sendSync(exchange);
     }
+    
+    protected void prepare(MessageExchange exchange) throws MessagingException {
+        Endpoint ep = this.endpoint.get();
+        if (ep != null && exchange.getStatus() == ExchangeStatus.ACTIVE && exchange.getRole() == Role.CONSUMER) {
+            ServiceMixComponent comp = ep.getServiceUnit().getComponent();
+            comp.prepareConsumerExchange(exchange, ep);
+        }
+    }
+    
+    public static void setEndpoint(Endpoint ep) {
+        endpoint.set(ep);
+    }
+    
 }

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java?view=diff&rev=563831&r1=563830&r2=563831
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java (original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/Jsr181ExchangeProcessor.java Wed Aug  8 05:29:25 2007
@@ -105,6 +105,7 @@
             }
             msg.setAttachments(attachments);
         }
+        EndpointDeliveryChannel.setEndpoint(endpoint);
         JBIContext.setMessageExchange(exchange);
         if (isInAndOut(exchange)) {
             
@@ -113,6 +114,7 @@
             c.receive(ctx, msg);
         } finally {
             JBIContext.setMessageExchange(null);
+            EndpointDeliveryChannel.setEndpoint(null);
         }
         c.close();