You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/10/20 11:48:00 UTC

svn commit: r586692 - in /incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src: main/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/beans/

Author: gnodet
Date: Sat Oct 20 02:48:00 2007
New Revision: 586692

URL: http://svn.apache.org/viewvc?rev=586692&view=rev
Log:
SM-1110: ServiceMix is not sending a response back to the calling Service in an In-Out Message Exchange

Added:
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
Modified:
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java

Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=586692&r1=586691&r2=586692&view=diff
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Sat Oct 20 02:48:00 2007
@@ -19,21 +19,32 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Map;
+import java.util.MissingResourceException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.logging.Logger;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
+import javax.jbi.JBIException;
 import javax.jbi.component.ComponentContext;
+import javax.jbi.management.MBeanNames;
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.MBeanServer;
+import javax.naming.InitialContext;
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.DocumentFragment;
 
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.commons.jexl.Expression;
@@ -75,7 +86,7 @@
     private String beanClassName;
     private MethodInvocationStrategy methodInvocationStrategy;
     private org.apache.servicemix.expression.Expression correlationExpression;
-    
+
     private Map<String, Holder> exchanges = new ConcurrentHashMap<String, Holder>();
     private Map<Object, Request> requests = new ConcurrentHashMap<Object, Request>();
     private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
@@ -202,7 +213,7 @@
             throw new IllegalStateException("Unknown role: " + exchange.getRole());
         }
     }
-    
+
     protected void onProviderExchange(MessageExchange exchange) throws Exception {
         Object corId = getCorrelation(exchange);
         Request req = requests.get(corId);
@@ -259,7 +270,7 @@
             currentRequest.set(null);
         }
     }
-    
+
     protected void onConsumerExchange(MessageExchange exchange) throws Exception {
         Object corId = exchange.getExchangeId();
         Request req = requests.remove(corId);
@@ -282,7 +293,7 @@
         checkEndOfRequest(req, corId);
         currentRequest.set(null);
     }
-    
+
     protected Object getCorrelation(MessageExchange exchange) throws MessagingException {
         return getCorrelationExpression().evaluate(exchange, exchange.getMessage("in"));
     }
@@ -321,6 +332,8 @@
      * @param target the bean to be injected
      */
     protected void injectBean(final Object target) {
+        final PojoContext ctx = new PojoContext();
+        final DeliveryChannel ch = ctx.channel;
         // Inject fields
         ReflectionUtils.doWithFields(target.getClass(), new ReflectionUtils.FieldCallback() {
             public void doWith(Field f) throws IllegalArgumentException, IllegalAccessException {
@@ -330,9 +343,9 @@
                 }
                 if (f.getAnnotation(Resource.class) != null) {
                     if (ComponentContext.class.isAssignableFrom(f.getType())) {
-                        ReflectionUtils.setField(f, target, context);
+                        ReflectionUtils.setField(f, target, ctx);
                     } else if (DeliveryChannel.class.isAssignableFrom(f.getType())) {
-                        ReflectionUtils.setField(f, target, channel);
+                        ReflectionUtils.setField(f, target, ch);
                     }
                 }
             }
@@ -368,7 +381,7 @@
             }
         });
     }
-    
+
     /**
      * Used by POJOs acting as a consumer
      * @param uri
@@ -380,7 +393,7 @@
             InOut me = getExchangeFactory().createInOutExchange();
             URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(), uri);
             MessageUtil.transferTo(message, me, "in");
-            final Holder h = new Holder(); 
+            final Holder h = new Holder();
             requests.put(me.getExchangeId(), currentRequest.get());
             exchanges.put(me.getExchangeId(), h);
             BeanEndpoint.this.send(me);
@@ -389,7 +402,7 @@
             throw new RuntimeException(e);
         }
     }
-    
+
     protected void checkEndOfRequest(Request request, Object corId) {
         if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
             ReflectionUtils.callLifecycleMethod(request.getBean(), PreDestroy.class);
@@ -430,5 +443,138 @@
      */
     public void setCorrelationExpression(org.apache.servicemix.expression.Expression correlationExpression) {
         this.correlationExpression = correlationExpression;
+    }
+
+    protected class PojoContext implements ComponentContext {
+
+        private DeliveryChannel channel = new PojoChannel();
+
+        public ServiceEndpoint activateEndpoint(QName qName, String s) throws JBIException {
+            return context.activateEndpoint(qName, s);
+        }
+
+        public void deactivateEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+            context.deactivateEndpoint(serviceEndpoint);
+        }
+
+        public void registerExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+            context.registerExternalEndpoint(serviceEndpoint);
+        }
+
+        public void deregisterExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+            context.deregisterExternalEndpoint(serviceEndpoint);
+        }
+
+        public ServiceEndpoint resolveEndpointReference(DocumentFragment documentFragment) {
+            return context.resolveEndpointReference(documentFragment);
+        }
+
+        public String getComponentName() {
+            return context.getComponentName();
+        }
+
+        public DeliveryChannel getDeliveryChannel() throws MessagingException {
+            return channel;
+        }
+
+        public ServiceEndpoint getEndpoint(QName qName, String s) {
+            return context.getEndpoint(qName, s);
+        }
+
+        public Document getEndpointDescriptor(ServiceEndpoint serviceEndpoint) throws JBIException {
+            return context.getEndpointDescriptor(serviceEndpoint);
+        }
+
+        public ServiceEndpoint[] getEndpoints(QName qName) {
+            return context.getEndpoints(qName);
+        }
+
+        public ServiceEndpoint[] getEndpointsForService(QName qName) {
+            return context.getEndpointsForService(qName);
+        }
+
+        public ServiceEndpoint[] getExternalEndpoints(QName qName) {
+            return context.getExternalEndpoints(qName);
+        }
+
+        public ServiceEndpoint[] getExternalEndpointsForService(QName qName) {
+            return context.getExternalEndpointsForService(qName);
+        }
+
+        public String getInstallRoot() {
+            return context.getInstallRoot();
+        }
+
+        public Logger getLogger(String s, String s1) throws MissingResourceException, JBIException {
+            return context.getLogger(s, s1);
+        }
+
+        public MBeanNames getMBeanNames() {
+            return context.getMBeanNames();
+        }
+
+        public MBeanServer getMBeanServer() {
+            return context.getMBeanServer();
+        }
+
+        public InitialContext getNamingContext() {
+            return context.getNamingContext();
+        }
+
+        public Object getTransactionManager() {
+            return context.getTransactionManager();
+        }
+
+        public String getWorkspaceRoot() {
+            return context.getWorkspaceRoot();
+        }
+    }
+
+    protected class PojoChannel implements DeliveryChannel {
+
+        public void close() throws MessagingException {
+            BeanEndpoint.this.channel.close();
+        }
+
+        public MessageExchangeFactory createExchangeFactory() {
+            return BeanEndpoint.this.channel.createExchangeFactory();
+        }
+
+        public MessageExchangeFactory createExchangeFactory(QName qName) {
+            return BeanEndpoint.this.channel.createExchangeFactory(qName);
+        }
+
+        public MessageExchangeFactory createExchangeFactoryForService(QName qName) {
+            return BeanEndpoint.this.channel.createExchangeFactoryForService(qName);
+        }
+
+        public MessageExchangeFactory createExchangeFactory(ServiceEndpoint serviceEndpoint) {
+            return BeanEndpoint.this.channel.createExchangeFactory(serviceEndpoint);
+        }
+
+        public MessageExchange accept() throws MessagingException {
+            return BeanEndpoint.this.channel.accept();
+        }
+
+        public MessageExchange accept(long l) throws MessagingException {
+            return BeanEndpoint.this.channel.accept(l);
+        }
+
+        public void send(MessageExchange messageExchange) throws MessagingException {
+            if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
+                    && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+                requests.put(messageExchange.getExchangeId(), currentRequest.get());
+            }
+            BeanEndpoint.this.channel.send(messageExchange);
+        }
+
+        public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
+            return BeanEndpoint.this.channel.sendSync(messageExchange);
+        }
+
+        public boolean sendSync(MessageExchange messageExchange, long l) throws MessagingException {
+            return BeanEndpoint.this.channel.sendSync(messageExchange, l);
+        }
+
     }
 }

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java?rev=586692&view=auto
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java Sat Oct 20 02:48:00 2007
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.beans.ConsumerListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+public class ConsumerListenerTest extends TestCase {
+
+    protected JBIContainer jbi;
+
+    protected void setUp() throws Exception {
+        jbi = new JBIContainer();
+        jbi.setEmbedded(true);
+        jbi.init();
+    }
+
+    public void test() throws Exception {
+        BeanComponent bc = new BeanComponent();
+        BeanEndpoint ep = new BeanEndpoint();
+        ep.setService(new QName("bean"));
+        ep.setEndpoint("endpoint");
+        ep.setBeanType(ConsumerListener.class);
+        bc.setEndpoints(new BeanEndpoint[] {ep });
+        jbi.activateComponent(bc, "servicemix-bean");
+
+        EchoComponent echo = new EchoComponent(new QName("echo"), "endpoint");
+        jbi.activateComponent(echo, "echo");
+
+        jbi.start();
+
+        ServiceMixClient client = new DefaultServiceMixClient(jbi);
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("bean"));
+        me.setOperation(new QName("receive"));
+        NormalizedMessage nm = me.getInMessage();
+        nm.setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(me);
+        assertExchangeWorked(me);
+        client.done(me);
+    }
+
+    protected void assertExchangeWorked(MessageExchange me) throws Exception {
+        if (me.getStatus() == ExchangeStatus.ERROR) {
+            if (me.getError() != null) {
+                throw me.getError();
+            } else {
+                fail("Received ERROR status");
+            }
+        } else if (me.getFault() != null) {
+            fail("Received fault: " + new SourceTransformer().toString(me.getFault().getContent()));
+        }
+    }
+
+}
\ No newline at end of file

Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java?rev=586692&r1=586691&r2=586692&view=diff
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java Sat Oct 20 02:48:00 2007
@@ -18,10 +18,12 @@
 
 import javax.annotation.Resource;
 import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
 
 import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.jbi.util.MessageUtil;
@@ -32,14 +34,28 @@
     private DeliveryChannel channel;
 
     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-        MessageExchangeFactory factory = channel.createExchangeFactory();
-        InOut io = factory.createInOutExchange();
-        try {
-            MessageUtil.transferInToIn(exchange, io);
-        } catch (MessagingException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new MessagingException(e);
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+                MessageUtil.transferOutToOut(exchange, io);
+                io.setProperty("exchange", exchange);
+                channel.send(io);
+            } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+                MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+                io.setStatus(ExchangeStatus.DONE);
+                channel.send(io);
+            }
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchangeFactory factory = channel.createExchangeFactory();
+                InOut io = factory.createInOutExchange();
+                MessageUtil.transferInToIn(exchange, io);
+                io.setService(new QName("echo"));
+                io.setProperty("exchange", exchange);
+                channel.send(io);
+            } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+                // Do nothing
+            }
         }
     }