You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/02/17 13:22:51 UTC

svn commit: r744995 - in /servicemix/components/engines/servicemix-bean/trunk: ./ src/main/java/org/apache/servicemix/bean/ src/main/java/org/apache/servicemix/bean/pojos/ src/main/java/org/apache/servicemix/bean/support/ src/test/java/org/apache/servi...

Author: gertv
Date: Tue Feb 17 12:22:51 2009
New Revision: 744995

URL: http://svn.apache.org/viewvc?rev=744995&view=rev
Log:
SMXCOMP-20: BeanEndpoint.requests map leaks a request when sending in-only mep with seda flow to a TransformBeanSupport-extended bean

Added:
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java   (with props)
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java   (with props)
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java   (with props)
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java   (with props)
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java   (with props)
Modified:
    servicemix/components/engines/servicemix-bean/trunk/pom.xml
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties

Modified: servicemix/components/engines/servicemix-bean/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/pom.xml?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-bean/trunk/pom.xml Tue Feb 17 12:22:51 2009
@@ -118,6 +118,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.14</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java Tue Feb 17 12:22:51 2009
@@ -40,7 +40,7 @@
  * @org.apache.xbean.XBean element="component" description="Bean Component"
  */
 public class BeanComponent extends DefaultComponent implements ApplicationContextAware {
-
+    
     private BeanEndpoint[] endpoints;
     private String[] searchPackages;
     private ApplicationContext applicationContext;

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Tue Feb 17 12:22:51 2009
@@ -75,6 +75,11 @@
  * @org.apache.xbean.XBean element="endpoint"
  */
 public class BeanEndpoint extends ProviderEndpoint implements ApplicationContextAware {
+    
+    /**
+     * Property name for the correlation id that is being set on exchanges by the BeanEndpoint 
+     */
+    public static final String CORRELATION_ID = BeanEndpoint.class.getName().replaceAll("\\.", "_") + "_correlation";
 
     private ApplicationContext applicationContext;
     private String beanName;
@@ -102,7 +107,7 @@
     public void start() throws Exception {
         super.start();
         if (serviceEndpoint == null) {
-        	serviceEndpoint = getContext().getEndpoint(getService(), getEndpoint());
+            serviceEndpoint = getContext().getEndpoint(getService(), getEndpoint());
         }
         Object pojo = getBean();
         if (pojo != null) {
@@ -216,7 +221,6 @@
     }
 
     protected void onProviderExchange(MessageExchange exchange) throws Exception {
-        Object corId = getCorrelation(exchange);
         Request req = getOrCreateCurrentRequest(exchange);
         currentRequest.set(req);
         synchronized (req) {
@@ -257,14 +261,16 @@
                     }
                 }
             }
-            checkEndOfRequest(req, corId);
+            checkEndOfRequest(req);
             currentRequest.set(null);
         }
     }
 
     protected Request getOrCreateCurrentRequest(MessageExchange exchange) throws ClassNotFoundException, InstantiationException, IllegalAccessException, MessagingException {
-        Object corId = getCorrelation(exchange);
-        Request req = requests.get(corId);
+        if (currentRequest.get() != null) {
+            return currentRequest.get();
+        }
+        Request req = getRequest(exchange);
         if (req == null) {
             Object pojo = getBean();
             if (pojo == null) {
@@ -272,32 +278,40 @@
                 injectBean(pojo);
                 ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
             }
-            req = new Request(pojo, exchange);
-            requests.put(corId, req);
+            req = new Request(getCorrelation(exchange), pojo, exchange);
+            requests.put(req.getCorrelationId(), req);
         }
         return req;
     }
+    
+    protected Request getRequest(MessageExchange exchange) throws MessagingException {
+        Object correlation = getCorrelation(exchange);
+        return correlation == null ? null : requests.get(correlation);
+    }
 
     protected void onConsumerExchange(MessageExchange exchange) throws Exception {
-        Object corId = exchange.getExchangeId();
-        Request req = requests.remove(corId);
+        Request req = getOrCreateCurrentRequest(exchange);
         if (req == null) {
             throw new IllegalStateException("Receiving unknown consumer exchange: " + exchange);
         }
         currentRequest.set(req);
-        // If the bean implements MessageExchangeListener,
-        // just call the method
-        if (req.getBean() instanceof MessageExchangeListener) {
+        
+        // if there's a holder for this exchange, act upon that
+        // else invoke the MessageExchangeListener interface
+        if (exchanges.containsKey(exchange.getExchangeId())) {
+            exchanges.remove(exchange.getExchangeId()).set(exchange);
+            evaluateCallbacks(req);
+            
+            //we should done() the consumer exchange here on behalf of the Destination who sent it
+            if (exchange instanceof InOut && ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                done(exchange);
+            }
+        } else if (req.getBean() instanceof MessageExchangeListener) {
             ((MessageExchangeListener) req.getBean()).onMessageExchange(exchange);
         } else {
-            Holder me = exchanges.get(exchange.getExchangeId());
-            if (me == null) {
-                throw new IllegalStateException("Consumer exchange not found");
-            }
-            me.set(exchange);
-            evaluateCallbacks(req);
+            throw new IllegalStateException("No known consumer exchange found and bean does not implement MessageExchangeListener");
         }
-        checkEndOfRequest(req, corId);
+        checkEndOfRequest(req);
         currentRequest.set(null);
     }
 
@@ -403,7 +417,7 @@
             URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(), uri);
             MessageUtil.transferTo(message, me, "in");
             final Holder h = new Holder();
-            requests.put(me.getExchangeId(), currentRequest.get());
+            getOrCreateCurrentRequest(me).addExchange(me);
             exchanges.put(me.getExchangeId(), h);
             BeanEndpoint.this.send(me);
             return h;
@@ -411,16 +425,40 @@
             throw new RuntimeException(e);
         }
     }
+    
+    @Override
+    protected void send(MessageExchange me) throws MessagingException {
+        checkEndOfRequest(me);
+        super.send(me);
+    }
+
+    /*
+     * Checks if the request has ended with the given MessageExchange.  It will only perform the check on non-ACTIVE exchanges
+     */
+    private void checkEndOfRequest(MessageExchange me) throws MessagingException {
+        if (!ExchangeStatus.ACTIVE.equals(me.getStatus())) {
+            Request request = getRequest(me);
+            if (request != null) {
+                checkEndOfRequest(request);
+            }
+        }
+    }
 
-    protected void checkEndOfRequest(Request request, Object corId) {
-        if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
-            Object beanFromRequest = request.getBean();
-            if (beanFromRequest != bean) {
-                ReflectionUtils.callLifecycleMethod(beanFromRequest, PreDestroy.class);
-            }
-            //request.setBean(null);
-            //request.setExchange(null);
-            requests.remove(corId);
+    /**
+     * Checks if the request has ended.  If the request has ended, 
+     * <ul>
+     * <li>the request object is being removed from the list of pending requests</li> 
+     * <li>if the bean was created for that request, it is now being destroyed</li>
+     * </ul>
+     * 
+     * @param req the Request instance to check
+     */
+    protected void checkEndOfRequest(Request req) {
+        if (req.isFinished()) {
+            requests.remove(req.getCorrelationId());
+            if (req.getBean() != bean) {
+                ReflectionUtils.callLifecycleMethod(req.getBean(), PreDestroy.class);
+            }
         }
     }
 
@@ -442,6 +480,9 @@
                 correlationExpression = new org.apache.servicemix.expression.Expression() {
                     public Object evaluate(MessageExchange exchange, NormalizedMessage message) 
                         throws MessagingException {
+                        if (exchange.getProperty(CORRELATION_ID) != null) {
+                            return exchange.getProperty(CORRELATION_ID);
+                        }
                         return exchange.getExchangeId();
                     }
                 };
@@ -573,12 +614,16 @@
 
         public void send(MessageExchange messageExchange) throws MessagingException {
             try {
+                Request req = getOrCreateCurrentRequest(messageExchange);
                 if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
                         && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
-                    Request req = getOrCreateCurrentRequest(messageExchange);
                     if (!(req.getBean() instanceof MessageExchangeListener)) {
                         throw new IllegalStateException("A bean acting as a consumer and using the channel to send exchanges must implement the MessageExchangeListener interface");
                     }
+                    req.addExchange(messageExchange);
+                }
+                if (messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
+                    checkEndOfRequest(req);
                 }
                 getChannel().send(messageExchange);
             } catch (MessagingException e) {
@@ -589,12 +634,13 @@
         }
 
         public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
+            checkEndOfRequest(messageExchange);
             return getChannel().sendSync(messageExchange);
         }
 
         public boolean sendSync(MessageExchange messageExchange, long l) throws MessagingException {
+            checkEndOfRequest(messageExchange);
             return getChannel().sendSync(messageExchange, l);
         }
-
     }
 }

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java Tue Feb 17 12:22:51 2009
@@ -23,7 +23,6 @@
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.ExchangeStatus;
-import javax.xml.transform.TransformerException;
 import javax.xml.transform.Source;
 import javax.xml.transform.dom.DOMSource;
 

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java Tue Feb 17 12:22:51 2009
@@ -17,27 +17,32 @@
 package org.apache.servicemix.bean.support;
 
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 
+import org.apache.servicemix.bean.BeanEndpoint;
+
 public class Request {
     private Object bean;
-    private MessageExchange exchange;
-    private Set<String> sentExchanges;
     // Keep track of callbacks already called, so that the same callback
     // can not be called twice
     private Map<Method, Boolean> callbacks;
+    private Object correlationId;
+    private final Set<MessageExchange> exchanges = new HashSet<MessageExchange>();
     
     public Request() {
     }
     
-    public Request(Object bean, MessageExchange exchange) {
+    public Request(Object correlationId, Object bean, MessageExchange exchange) {
+        this.correlationId = correlationId;
         this.bean = bean;
-        this.exchange = exchange;
+        exchanges.add(exchange);
     }
     
     /**
@@ -53,26 +58,9 @@
     public void setBean(Object bean) {
         this.bean = bean;
     }
-    /**
-     * @return the exchange
-     */
-    public MessageExchange getExchange() {
-        return exchange;
-    }
-    /**
-     * @param exchange the exchange to set
-     */
-    public void setExchange(MessageExchange exchange) {
-        this.exchange = exchange;
-    }
-    /**
-     * @param id the id of the exchange sent 
-     */
-    public void addSentExchange(String id) {
-        if (sentExchanges == null) {
-            sentExchanges = new HashSet<String>();
-        }
-        sentExchanges.add(id);
+    
+    public Object getCorrelationId() {
+        return correlationId;
     }
 
     /**
@@ -85,4 +73,35 @@
         return callbacks;
     }
 
+    /**
+     * Check if this request is completely finished.  
+     *  
+     * @return <code>true</code> if both the Exchange is DONE and there are no more outstanding sent exchanges
+     */
+    public boolean isFinished() {
+        for (MessageExchange exchange : exchanges) {
+            if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Add an exchange to this request.  All exchanges that are added to the request have to be finished 
+     * @param exchange
+     */
+    public void addExchange(MessageExchange exchange) {
+        exchanges.add(exchange);
+        exchange.setProperty(BeanEndpoint.CORRELATION_ID, correlationId);
+    }
+    
+    /**
+     * Get all the MessageExchanges that are involved in this request
+     * 
+     * @return an unmodifiable list of {@link MessageExchange}s
+     */
+    public Set<MessageExchange> getExchanges() {
+        return Collections.unmodifiableSet(exchanges);
+    }
 }

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+
+public abstract class AbstractBeanComponentTest extends TestCase {
+    
+    protected DefaultServiceMixClient client;
+    protected JBIContainer container;
+    protected ExchangeCompletedListener listener;
+    protected BeanComponent component;
+
+    protected void setUp() throws Exception {
+        container = new JBIContainer();
+        container.setEmbedded(true);
+        container.setUseMBeanServer(false);
+        container.setCreateMBeanServer(false);
+        configureContainer();
+        listener = new ExchangeCompletedListener();
+        container.addListener(listener);
+        
+        container.init();
+        container.start();
+
+        component = new BeanComponent();
+        container.activateComponent(component, "servicemix-bean");
+        
+        client = new DefaultServiceMixClient(container);
+    }
+
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        container.shutDown();
+    }
+
+    protected abstract void configureContainer();
+    
+    @SuppressWarnings("unchecked")
+    protected void assertBeanEndpointRequestsMapEmpty(BeanEndpoint beanEndpoint) throws Exception {
+        Field requestsMapField = BeanEndpoint.class.getDeclaredField("requests");
+        requestsMapField.setAccessible(true);
+        Map requestsMap = (Map) requestsMapField.get(beanEndpoint);
+        if (requestsMap.size() > 0) {
+            Thread.sleep(1000);
+        }
+        assertEquals("There should be no more pending requests on " + beanEndpoint, 0, requestsMap.size());
+    }
+}

Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+public class BeanEndpointInOptionalOutSedaTest extends BeanEndpointInOptionalOutTest {
+    
+    protected void configureContainer() {
+        container.setFlowName("seda");
+    }    
+}

Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOptionalOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
+
+/**
+ * A set of tests for checking InOptionalOut exchange handling by a bean endpoint
+ */
+public class BeanEndpointInOptionalOutTest extends AbstractBeanComponentTest {
+    
+    private static final QName IN_OPTIONAL_OUT_PRODUCER = new QName("urn:test", "ioo-producer");
+    private static final QName IN_OPTIONAL_OUT_CONSUMER = new QName("urn:test", "ioo-consumer");
+
+    protected void configureContainer() {
+        container.setFlowName("st");
+    }
+    
+    //we first have a set of tests that send an InOptionalOut exchange to the bean endpoint
+    public void testInOptionalOutWithBeanType() throws Exception {
+        BeanEndpoint endpoint = createBeanEndpoint(MyInOptionalOutBean.class, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+    
+    public void testInOptionalOutReturnsOut() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        client.done(io);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    public void testInOptionalOutReturnsFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.fault = new StringSource("<failed_at_provider/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        client.done(io);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    public void testInOptionalOutClientFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        Fault fault = io.createFault();
+        fault.setContent(new StringSource("<failed_at_consumer/>"));
+        client.fail(io, fault);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    // this is a set of tests where the bean endpoint also acts as consumer and sends InOptionalOut exchanges
+    public void testInOptionalOutConsumerDone() throws Exception {
+        BeanEndpoint provider = createBeanEndpoint(MyInOptionalOutBean.class, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutProviderReturnsOut() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutProviderReturnsFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.fault = new StringSource("<fault_at_provider/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutConsumerReturnsFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("sendAndFault"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    private BeanEndpoint createConsumerEndpoint() throws Exception {
+        MyConsumerBean bean = new MyConsumerBean();
+        bean.target = IN_OPTIONAL_OUT_PRODUCER;
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBean(bean);
+        endpoint.setService(IN_OPTIONAL_OUT_CONSUMER);
+        endpoint.setEndpoint("endpoint");
+        component.addEndpoint(endpoint);
+        return endpoint;
+    }
+    
+    private BeanEndpoint createBeanEndpoint(Object bean, QName service) {
+        BeanEndpoint transformEndpoint = new BeanEndpoint();
+        transformEndpoint.setBean(bean);
+        transformEndpoint.setService(service);
+        transformEndpoint.setEndpoint("endpoint");
+        return transformEndpoint;
+    }
+      
+    private BeanEndpoint createBeanEndpoint(Class<?> type, QName service) {
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBeanType(type);
+        endpoint.setService(service);
+        endpoint.setEndpoint("endpoint");
+        return endpoint;
+    }
+    
+    public static final class MyInOptionalOutBean implements MessageExchangeListener {
+        
+        private Source fault;
+        private Source response;
+        
+        @Resource
+        private DeliveryChannel channel;
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange instanceof InOptionalOut) {
+                onInOptionalOut((InOptionalOut) exchange);
+            } else {
+                exchange.setError(new Exception("Only InOptionalOut supported here"));
+            }
+        }
+
+        private void onInOptionalOut(InOptionalOut exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                if (response != null) {
+                    exchange.setOutMessage(exchange.createMessage());
+                    exchange.getOutMessage().setContent(response);
+                    response = null;
+                } else if (fault != null) {
+                    exchange.setFault(exchange.createFault());
+                    exchange.getFault().setContent(fault);
+                    fault = null;
+                } else {
+                    exchange.setStatus(ExchangeStatus.DONE);
+                }
+                channel.send(exchange);
+            }
+        }
+    }
+    
+    public static final class MyConsumerBean implements MessageExchangeListener {
+        
+        @Resource
+        private DeliveryChannel channel;
+        private QName target;
+        private MessageExchange original;
+        private Source fault;
+                
+        public void send() throws MessagingException {
+            InOptionalOut ioo = channel.createExchangeFactory().createInOptionalOutExchange();
+            ioo.setService(target);
+            ioo.setInMessage(ioo.createMessage());
+            ioo.getMessage("in").setContent(new StringSource("<hello/>"));
+            channel.send(ioo);
+        }
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getRole() == Role.PROVIDER) {
+                original = exchange;
+                if (exchange.getOperation().equals(new QName("sendAndFault"))) {
+                    fault = new StringSource("<faulted_by_consumer/>");
+                }
+                send();
+            } else {                
+                if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                    if (fault != null) {
+                        exchange.setFault(exchange.createFault());
+                        exchange.getFault().setContent(fault);
+                        fault = null;
+                    } else {
+                        exchange.setStatus(ExchangeStatus.DONE);
+                        done();
+                    }
+                    channel.send(exchange);
+                } else {
+                    done();
+                }
+            }
+        }
+
+        private void done() throws MessagingException {
+            original.setStatus(ExchangeStatus.DONE);
+            channel.send(original);
+        }
+    }
+}

Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+
+public class TransformBeanSupportSedaFlowTest extends TransformBeanSupportTest {
+
+    protected void configureContainer() {
+        container.setFlowName("seda");
+    }
+    
+}

Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java Tue Feb 17 12:22:51 2009
@@ -16,62 +16,62 @@
  */
 package org.apache.servicemix.bean;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.MessageExchange.Role;
 import javax.xml.namespace.QName;
 
-import junit.framework.TestCase;
-
-import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.bean.support.ExchangeTarget;
 import org.apache.servicemix.bean.support.TransformBeanSupport;
-import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.common.util.MessageUtil;
 import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.expression.JAXPXPathExpression;
+import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
 import org.apache.servicemix.tck.ReceiverComponent;
+import org.w3c.dom.Element;
 
-public class TransformBeanSupportTest extends TestCase {
+public class TransformBeanSupportTest extends AbstractBeanComponentTest {
 
-    protected DefaultServiceMixClient client;
-    protected JBIContainer container;
-    protected ExchangeCompletedListener listener;
-    protected BeanComponent component;
+    protected void configureContainer() {
+        container.setFlowName("st");
+    }
+    
+    public void testInOut() throws Exception {
+        TransformBeanSupport transformer = new MyTransformer();
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        component.addEndpoint(transformEndpoint);
 
-    protected void setUp() throws Exception {
-        container = new JBIContainer();
-        container.setEmbedded(true);
-        container.setUseMBeanServer(false);
-        container.setCreateMBeanServer(false);
-        configureContainer();
-        listener = new ExchangeCompletedListener();
-        container.addListener(listener);
+        MessageExchange io = client.createInOutExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
         
-        container.init();
-        container.start();
-
-        component = new BeanComponent();
-        container.activateComponent(component, "servicemix-bean");
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+        assertEquals("hello", e.getNodeName());
         
-        client = new DefaultServiceMixClient(container);
-    }
-
-    protected void tearDown() throws Exception {
-        listener.assertExchangeCompleted();
-        container.shutDown();
-    }
-
-    protected void configureContainer() throws Exception {
-        container.setFlowName("st");
+        client.done(io);
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
     }
     
-    public void testInOut() throws Exception {
+    public void testInOutWithFault() throws Exception {
         TransformBeanSupport transformer = new MyTransformer();
         BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
         component.addEndpoint(transformEndpoint);
@@ -83,11 +83,34 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
-        assertEquals("<hello/>", new SourceTransformer().contentToString(io.getMessage("out")));
+        Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+        assertEquals("hello", e.getNodeName());
+        
+        client.fail(io, new Exception("We failed to handle the reponse"));
+        assertEquals(ExchangeStatus.ERROR, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+    }
+    
+    public void testInOutWithBeanType() throws Exception {
+        BeanEndpoint endpoint = createBeanEndpoint(AssertSameInstancePojo.class);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOutExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+        assertEquals("hello", e.getNodeName());
         
         client.done(io);
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
     }
+    
+    
 
     public void testInOnly() throws Exception {
         TransformBeanSupport transformer = createTransformer("receiver");
@@ -104,6 +127,28 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+        
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
+    
+    public void testInOnlyWithCorrelation() throws Exception {
+        TransformBeanSupport transformer = createTransformer("receiver");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        transformEndpoint.setCorrelationExpression(new JAXPXPathExpression("/message/@id"));
+        component.addEndpoint(transformEndpoint);
+
+        ReceiverComponent receiver = new ReceiverComponent();
+        activateComponent(receiver, "receiver");
+        
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<message id='1'/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
         
         receiver.getMessageList().assertMessagesReceived(1);
     }
@@ -122,6 +167,26 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.ERROR, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+    }
+    
+    public void testInOnlyWithDestination() throws Exception {
+        BeanEndpoint endpoint = createBeanEndpoint(MyDestinationTransformer.class);
+        component.addEndpoint(endpoint);
+
+        ActivationSpec spec = new ActivationSpec(new EchoComponent());
+        spec.setService(new QName("test", "receiver"));
+        spec.setComponentName("receiver");
+        container.activateComponent(spec);
+        
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);
     }
 
     public void testRobustInOnly() throws Exception {
@@ -139,6 +204,7 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
         
         receiver.getMessageList().assertMessagesReceived(1);
     }
@@ -159,6 +225,7 @@
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
         assertNotNull(io.getFault());
         client.done(io);
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
     }
 
     public void testRobustInOnlyWithFaultAndError() throws Exception {
@@ -177,6 +244,7 @@
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
         assertNotNull(io.getFault());
         client.fail(io, new Exception("I do not like faults"));
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
     }
 
     private MyTransformer createTransformer(String targetService) {
@@ -195,6 +263,14 @@
         return transformEndpoint;
     }
     
+    private BeanEndpoint createBeanEndpoint(Class<?> type) {
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBeanType(type);
+        endpoint.setService(new QName("transform"));
+        endpoint.setEndpoint("endpoint");
+        return endpoint;
+    }
+    
     protected void activateComponent(ComponentSupport comp, String name) throws Exception {
         comp.setService(new QName(name));
         comp.setEndpoint("endpoint");
@@ -207,8 +283,8 @@
             return true;
         }
     }
-
-    public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+    
+    public static class ReturnErrorComponent extends ComponentSupport implements org.apache.servicemix.MessageExchangeListener {
 
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -217,7 +293,7 @@
         }
     }
 
-    public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+    public static class ReturnFaultComponent extends ComponentSupport implements org.apache.servicemix.MessageExchangeListener {
         
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -228,4 +304,57 @@
         }
     }
     
+    public static class AssertSameInstancePojo implements MessageExchangeListener {
+        
+        @Resource 
+        private DeliveryChannel channel;
+        
+        private String id;
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            assertId(exchange);
+            if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                MessageUtil.enableContentRereadability(exchange.getMessage("in"));
+                MessageUtil.transferInToOut(exchange, exchange);
+                channel.send(exchange);
+            }
+        }
+
+        private void assertId(MessageExchange exchange) {
+            if (exchange.getStatus().equals(ExchangeStatus.ACTIVE)) {
+                id = exchange.getExchangeId();
+            } else {
+                // make sure that the same object is being used to handle the Exchange with status DONE 
+                assertEquals(id, exchange.getExchangeId());
+            }
+        }        
+    }
+    
+    public static class MyDestinationTransformer implements MessageExchangeListener {
+        
+        @org.apache.servicemix.bean.ExchangeTarget(uri="service:test:receiver")
+        private Destination receiver;
+        
+        @Resource
+        private DeliveryChannel channel;
+        
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE && exchange instanceof InOnly) {
+                NormalizedMessage forward = receiver.createMessage();
+                forward.setContent(exchange.getMessage("in").getContent());
+                Future<NormalizedMessage> response = receiver.send(forward);
+                //let's wait for the response to come back
+                try {
+                    response.get();
+                    exchange.setStatus(ExchangeStatus.DONE);
+                } catch (InterruptedException e) {
+                    exchange.setError(e);
+                } catch (ExecutionException e) {
+                    exchange.setError(e);
+                } finally {
+                    channel.send(exchange);
+                }
+            }
+        }
+    }
 }

Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java Tue Feb 17 12:22:51 2009
@@ -41,45 +41,41 @@
 	private DeliveryChannel channel;
 
 	@PostConstruct
-	public void init() {
-		senderThread = new Thread(
+    public void init() {
+        senderThread = new Thread(
 
-		new Runnable() {
-			public void run() {
-				while (keepRunning.get()) {
-
-					try {
-						String text = "<Hello/>";
-						InOnly exchange = channel
-								.createExchangeFactoryForService(target)
-								.createInOnlyExchange();
-						NormalizedMessage msg = exchange.createMessage();
-						msg.setContent(new StringSource(text));
+        new Runnable() {
+            public void run() {
+                while (keepRunning.get()) {
+                    try {
+                        String text = "<Hello/>";
+                        InOnly exchange = channel.createExchangeFactoryForService(target).createInOnlyExchange();
+                        NormalizedMessage msg = exchange.createMessage();
+                        msg.setContent(new StringSource(text));
                         exchange.setInMessage(msg);
-                        System.out.println("Sending message: " + text);
-						channel.send(exchange);
-					} catch (Exception e) {
-						e.printStackTrace();
-					}
-
-					try {
-						Thread.sleep(500);
-					} catch (InterruptedException e) {
-						// ignore
-					}
-				}
-			}
-		});
-		senderThread.start();
-	}
+                        channel.send(exchange);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                    try {
+                        Thread.sleep(500);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            }
+        });
+        senderThread.start();
+    }
 
-	@PreDestroy
-	public void destroy() {
-		keepRunning.set(false);
-		if (senderThread != null && senderThread.isAlive()) {
-			senderThread.interrupt();
-		}
-	}
+    @PreDestroy
+    public void destroy() {
+        keepRunning.set(false);
+        if (senderThread != null && senderThread.isAlive()) {
+            senderThread.interrupt();
+        }
+    }
 
     public void onMessageExchange(MessageExchange messageExchange) throws MessagingException {
         // Do nothing

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.support;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.BeanEndpoint;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link Request}
+ */
+public class RequestTest extends TestCase {
+    
+    @Test
+    public void testIsFinishedOnStatus() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), exchange);
+        assertFalse(request.isFinished());
+        exchange.setStatus(ExchangeStatus.DONE);
+        assertTrue(request.isFinished());
+    }
+    
+    @Test
+    public void testIsFinishedWhenAllExchangesDoneOrError() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), exchange);
+        assertFalse(request.isFinished());
+        
+        MessageExchange second = createMockExchange("my-second-id");
+        request.addExchange(second);
+        exchange.setStatus(ExchangeStatus.DONE);
+        assertFalse(request.isFinished());
+        
+        second.setStatus(ExchangeStatus.ERROR);
+        assertTrue(request.isFinished());
+    }
+    
+    public void testAddExchangeSetsCorrelationId() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), exchange);
+
+        MessageExchange second = createMockExchange("my-second-id");
+        request.addExchange(second);
+        assertEquals("my-correlation-id", second.getProperty(BeanEndpoint.CORRELATION_ID));
+    }
+    
+    public void testNoSentExchangeForCorrelationId() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), exchange);
+        request.addExchange(exchange);
+        assertEquals("We shouldn't have duplicate MessageExchange instances", 1, request.getExchanges().size());
+    }
+    
+    private MessageExchange createMockExchange(String id) {
+        MockMessageExchange exchange = new MockMessageExchange();
+        exchange.setExchangeId(id);
+        exchange.setStatus(ExchangeStatus.ACTIVE);
+        return exchange;
+    }
+}

Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties Tue Feb 17 12:22:51 2009
@@ -21,7 +21,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=DEBUG, out
 
 log4j.logger.org.springframework=INFO
 log4j.logger.org.apache.activemq=INFO