You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/02/17 13:22:51 UTC
svn commit: r744995 - in
/servicemix/components/engines/servicemix-bean/trunk: ./
src/main/java/org/apache/servicemix/bean/
src/main/java/org/apache/servicemix/bean/pojos/
src/main/java/org/apache/servicemix/bean/support/
src/test/java/org/apache/servi...
Author: gertv
Date: Tue Feb 17 12:22:51 2009
New Revision: 744995
URL: http://svn.apache.org/viewvc?rev=744995&view=rev
Log:
SMXCOMP-20: BeanEndpoint.requests map leaks a request when sending in-only mep with seda flow to a TransformBeanSupport-extended bean
Added:
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java (with props)
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java (with props)
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java (with props)
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java (with props)
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java (with props)
Modified:
servicemix/components/engines/servicemix-bean/trunk/pom.xml
servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java
servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java
servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties
Modified: servicemix/components/engines/servicemix-bean/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/pom.xml?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-bean/trunk/pom.xml Tue Feb 17 12:22:51 2009
@@ -118,6 +118,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.14</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanComponent.java Tue Feb 17 12:22:51 2009
@@ -40,7 +40,7 @@
* @org.apache.xbean.XBean element="component" description="Bean Component"
*/
public class BeanComponent extends DefaultComponent implements ApplicationContextAware {
-
+
private BeanEndpoint[] endpoints;
private String[] searchPackages;
private ApplicationContext applicationContext;
Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Tue Feb 17 12:22:51 2009
@@ -75,6 +75,11 @@
* @org.apache.xbean.XBean element="endpoint"
*/
public class BeanEndpoint extends ProviderEndpoint implements ApplicationContextAware {
+
+ /**
+ * Property name for the correlation id that is being set on exchanges by the BeanEndpoint
+ */
+ public static final String CORRELATION_ID = BeanEndpoint.class.getName().replaceAll("\\.", "_") + "_correlation";
private ApplicationContext applicationContext;
private String beanName;
@@ -102,7 +107,7 @@
public void start() throws Exception {
super.start();
if (serviceEndpoint == null) {
- serviceEndpoint = getContext().getEndpoint(getService(), getEndpoint());
+ serviceEndpoint = getContext().getEndpoint(getService(), getEndpoint());
}
Object pojo = getBean();
if (pojo != null) {
@@ -216,7 +221,6 @@
}
protected void onProviderExchange(MessageExchange exchange) throws Exception {
- Object corId = getCorrelation(exchange);
Request req = getOrCreateCurrentRequest(exchange);
currentRequest.set(req);
synchronized (req) {
@@ -257,14 +261,16 @@
}
}
}
- checkEndOfRequest(req, corId);
+ checkEndOfRequest(req);
currentRequest.set(null);
}
}
protected Request getOrCreateCurrentRequest(MessageExchange exchange) throws ClassNotFoundException, InstantiationException, IllegalAccessException, MessagingException {
- Object corId = getCorrelation(exchange);
- Request req = requests.get(corId);
+ if (currentRequest.get() != null) {
+ return currentRequest.get();
+ }
+ Request req = getRequest(exchange);
if (req == null) {
Object pojo = getBean();
if (pojo == null) {
@@ -272,32 +278,40 @@
injectBean(pojo);
ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
}
- req = new Request(pojo, exchange);
- requests.put(corId, req);
+ req = new Request(getCorrelation(exchange), pojo, exchange);
+ requests.put(req.getCorrelationId(), req);
}
return req;
}
+
+ protected Request getRequest(MessageExchange exchange) throws MessagingException {
+ Object correlation = getCorrelation(exchange);
+ return correlation == null ? null : requests.get(correlation);
+ }
protected void onConsumerExchange(MessageExchange exchange) throws Exception {
- Object corId = exchange.getExchangeId();
- Request req = requests.remove(corId);
+ Request req = getOrCreateCurrentRequest(exchange);
if (req == null) {
throw new IllegalStateException("Receiving unknown consumer exchange: " + exchange);
}
currentRequest.set(req);
- // If the bean implements MessageExchangeListener,
- // just call the method
- if (req.getBean() instanceof MessageExchangeListener) {
+
+ // if there's a holder for this exchange, act upon that
+ // else invoke the MessageExchangeListener interface
+ if (exchanges.containsKey(exchange.getExchangeId())) {
+ exchanges.remove(exchange.getExchangeId()).set(exchange);
+ evaluateCallbacks(req);
+
+ //we should done() the consumer exchange here on behalf of the Destination who sent it
+ if (exchange instanceof InOut && ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+ done(exchange);
+ }
+ } else if (req.getBean() instanceof MessageExchangeListener) {
((MessageExchangeListener) req.getBean()).onMessageExchange(exchange);
} else {
- Holder me = exchanges.get(exchange.getExchangeId());
- if (me == null) {
- throw new IllegalStateException("Consumer exchange not found");
- }
- me.set(exchange);
- evaluateCallbacks(req);
+ throw new IllegalStateException("No known consumer exchange found and bean does not implement MessageExchangeListener");
}
- checkEndOfRequest(req, corId);
+ checkEndOfRequest(req);
currentRequest.set(null);
}
@@ -403,7 +417,7 @@
URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(), uri);
MessageUtil.transferTo(message, me, "in");
final Holder h = new Holder();
- requests.put(me.getExchangeId(), currentRequest.get());
+ getOrCreateCurrentRequest(me).addExchange(me);
exchanges.put(me.getExchangeId(), h);
BeanEndpoint.this.send(me);
return h;
@@ -411,16 +425,40 @@
throw new RuntimeException(e);
}
}
+
+ @Override
+ protected void send(MessageExchange me) throws MessagingException {
+ checkEndOfRequest(me);
+ super.send(me);
+ }
+
+ /*
+ * Checks if the request has ended with the given MessageExchange. It will only perform the check on non-ACTIVE exchanges
+ */
+ private void checkEndOfRequest(MessageExchange me) throws MessagingException {
+ if (!ExchangeStatus.ACTIVE.equals(me.getStatus())) {
+ Request request = getRequest(me);
+ if (request != null) {
+ checkEndOfRequest(request);
+ }
+ }
+ }
- protected void checkEndOfRequest(Request request, Object corId) {
- if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
- Object beanFromRequest = request.getBean();
- if (beanFromRequest != bean) {
- ReflectionUtils.callLifecycleMethod(beanFromRequest, PreDestroy.class);
- }
- //request.setBean(null);
- //request.setExchange(null);
- requests.remove(corId);
+ /**
+ * Checks if the request has ended. If the request has ended,
+ * <ul>
+ * <li>the request object is being removed from the list of pending requests</li>
+ * <li>if the bean was created for that request, it is now being destroyed</li>
+ * </ul>
+ *
+ * @param req the Request instance to check
+ */
+ protected void checkEndOfRequest(Request req) {
+ if (req.isFinished()) {
+ requests.remove(req.getCorrelationId());
+ if (req.getBean() != bean) {
+ ReflectionUtils.callLifecycleMethod(req.getBean(), PreDestroy.class);
+ }
}
}
@@ -442,6 +480,9 @@
correlationExpression = new org.apache.servicemix.expression.Expression() {
public Object evaluate(MessageExchange exchange, NormalizedMessage message)
throws MessagingException {
+ if (exchange.getProperty(CORRELATION_ID) != null) {
+ return exchange.getProperty(CORRELATION_ID);
+ }
return exchange.getExchangeId();
}
};
@@ -573,12 +614,16 @@
public void send(MessageExchange messageExchange) throws MessagingException {
try {
+ Request req = getOrCreateCurrentRequest(messageExchange);
if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
&& messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
- Request req = getOrCreateCurrentRequest(messageExchange);
if (!(req.getBean() instanceof MessageExchangeListener)) {
throw new IllegalStateException("A bean acting as a consumer and using the channel to send exchanges must implement the MessageExchangeListener interface");
}
+ req.addExchange(messageExchange);
+ }
+ if (messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
+ checkEndOfRequest(req);
}
getChannel().send(messageExchange);
} catch (MessagingException e) {
@@ -589,12 +634,13 @@
}
public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
+ checkEndOfRequest(messageExchange);
return getChannel().sendSync(messageExchange);
}
public boolean sendSync(MessageExchange messageExchange, long l) throws MessagingException {
+ checkEndOfRequest(messageExchange);
return getChannel().sendSync(messageExchange, l);
}
-
}
}
Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/pojos/LoggingPojo.java Tue Feb 17 12:22:51 2009
@@ -23,7 +23,6 @@
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.ExchangeStatus;
-import javax.xml.transform.TransformerException;
import javax.xml.transform.Source;
import javax.xml.transform.dom.DOMSource;
Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/Request.java Tue Feb 17 12:22:51 2009
@@ -17,27 +17,32 @@
package org.apache.servicemix.bean.support;
import java.lang.reflect.Method;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import org.apache.servicemix.bean.BeanEndpoint;
+
public class Request {
private Object bean;
- private MessageExchange exchange;
- private Set<String> sentExchanges;
// Keep track of callbacks already called, so that the same callback
// can not be called twice
private Map<Method, Boolean> callbacks;
+ private Object correlationId;
+ private final Set<MessageExchange> exchanges = new HashSet<MessageExchange>();
public Request() {
}
- public Request(Object bean, MessageExchange exchange) {
+ public Request(Object correlationId, Object bean, MessageExchange exchange) {
+ this.correlationId = correlationId;
this.bean = bean;
- this.exchange = exchange;
+ exchanges.add(exchange);
}
/**
@@ -53,26 +58,9 @@
public void setBean(Object bean) {
this.bean = bean;
}
- /**
- * @return the exchange
- */
- public MessageExchange getExchange() {
- return exchange;
- }
- /**
- * @param exchange the exchange to set
- */
- public void setExchange(MessageExchange exchange) {
- this.exchange = exchange;
- }
- /**
- * @param id the id of the exchange sent
- */
- public void addSentExchange(String id) {
- if (sentExchanges == null) {
- sentExchanges = new HashSet<String>();
- }
- sentExchanges.add(id);
+
+ public Object getCorrelationId() {
+ return correlationId;
}
/**
@@ -85,4 +73,35 @@
return callbacks;
}
+ /**
+ * Check if this request is completely finished.
+ *
+ * @return <code>true</code> if both the Exchange is DONE and there are no more outstanding sent exchanges
+ */
+ public boolean isFinished() {
+ for (MessageExchange exchange : exchanges) {
+ if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Add an exchange to this request. All exchanges that are added to the request have to be finished
+ * @param exchange
+ */
+ public void addExchange(MessageExchange exchange) {
+ exchanges.add(exchange);
+ exchange.setProperty(BeanEndpoint.CORRELATION_ID, correlationId);
+ }
+
+ /**
+ * Get all the MessageExchanges that are involved in this request
+ *
+ * @return an unmodifiable list of {@link MessageExchange}s
+ */
+ public Set<MessageExchange> getExchanges() {
+ return Collections.unmodifiableSet(exchanges);
+ }
}
Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+
+public abstract class AbstractBeanComponentTest extends TestCase {
+
+ protected DefaultServiceMixClient client;
+ protected JBIContainer container;
+ protected ExchangeCompletedListener listener;
+ protected BeanComponent component;
+
+ protected void setUp() throws Exception {
+ container = new JBIContainer();
+ container.setEmbedded(true);
+ container.setUseMBeanServer(false);
+ container.setCreateMBeanServer(false);
+ configureContainer();
+ listener = new ExchangeCompletedListener();
+ container.addListener(listener);
+
+ container.init();
+ container.start();
+
+ component = new BeanComponent();
+ container.activateComponent(component, "servicemix-bean");
+
+ client = new DefaultServiceMixClient(container);
+ }
+
+ protected void tearDown() throws Exception {
+ listener.assertExchangeCompleted();
+ container.shutDown();
+ }
+
+ protected abstract void configureContainer();
+
+ @SuppressWarnings("unchecked")
+ protected void assertBeanEndpointRequestsMapEmpty(BeanEndpoint beanEndpoint) throws Exception {
+ Field requestsMapField = BeanEndpoint.class.getDeclaredField("requests");
+ requestsMapField.setAccessible(true);
+ Map requestsMap = (Map) requestsMapField.get(beanEndpoint);
+ if (requestsMap.size() > 0) {
+ Thread.sleep(1000);
+ }
+ assertEquals("There should be no more pending requests on " + beanEndpoint, 0, requestsMap.size());
+ }
+}
Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+public class BeanEndpointInOptionalOutSedaTest extends BeanEndpointInOptionalOutTest {
+
+ protected void configureContainer() {
+ container.setFlowName("seda");
+ }
+}
Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOptionalOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
+
+/**
+ * A set of tests for checking InOptionalOut exchange handling by a bean endpoint
+ */
+public class BeanEndpointInOptionalOutTest extends AbstractBeanComponentTest {
+
+ private static final QName IN_OPTIONAL_OUT_PRODUCER = new QName("urn:test", "ioo-producer");
+ private static final QName IN_OPTIONAL_OUT_CONSUMER = new QName("urn:test", "ioo-consumer");
+
+ protected void configureContainer() {
+ container.setFlowName("st");
+ }
+
+ //we first have a set of tests that send an InOptionalOut exchange to the bean endpoint
+ public void testInOptionalOutWithBeanType() throws Exception {
+ BeanEndpoint endpoint = createBeanEndpoint(MyInOptionalOutBean.class, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(endpoint);
+
+ MessageExchange io = client.createInOptionalOutExchange();
+ io.setService(IN_OPTIONAL_OUT_PRODUCER);
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(endpoint);
+ }
+
+ public void testInOptionalOutReturnsOut() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.response = new StringSource("<goodbye/>");
+ BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(endpoint);
+
+ MessageExchange io = client.createInOptionalOutExchange();
+ io.setService(IN_OPTIONAL_OUT_PRODUCER);
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ client.done(io);
+ assertBeanEndpointRequestsMapEmpty(endpoint);
+ }
+
+ public void testInOptionalOutReturnsFault() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.fault = new StringSource("<failed_at_provider/>");
+ BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(endpoint);
+
+ MessageExchange io = client.createInOptionalOutExchange();
+ io.setService(IN_OPTIONAL_OUT_PRODUCER);
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ client.done(io);
+ assertBeanEndpointRequestsMapEmpty(endpoint);
+ }
+
+ public void testInOptionalOutClientFault() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.response = new StringSource("<goodbye/>");
+ BeanEndpoint endpoint = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(endpoint);
+
+ MessageExchange io = client.createInOptionalOutExchange();
+ io.setService(IN_OPTIONAL_OUT_PRODUCER);
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ Fault fault = io.createFault();
+ fault.setContent(new StringSource("<failed_at_consumer/>"));
+ client.fail(io, fault);
+ assertBeanEndpointRequestsMapEmpty(endpoint);
+ }
+
+ // this is a set of tests where the bean endpoint also acts as consumer and sends InOptionalOut exchanges
+ public void testInOptionalOutConsumerDone() throws Exception {
+ BeanEndpoint provider = createBeanEndpoint(MyInOptionalOutBean.class, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(provider);
+ BeanEndpoint consumer = createConsumerEndpoint();
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(IN_OPTIONAL_OUT_CONSUMER);
+ io.setOperation(new QName("send"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(provider);
+ assertBeanEndpointRequestsMapEmpty(consumer);
+ }
+
+ public void testConsumerInOptionalOutProviderReturnsOut() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.response = new StringSource("<goodbye/>");
+ BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(provider);
+ BeanEndpoint consumer = createConsumerEndpoint();
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(IN_OPTIONAL_OUT_CONSUMER);
+ io.setOperation(new QName("send"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(provider);
+ assertBeanEndpointRequestsMapEmpty(consumer);
+ }
+
+ public void testConsumerInOptionalOutProviderReturnsFault() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.fault = new StringSource("<fault_at_provider/>");
+ BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(provider);
+ BeanEndpoint consumer = createConsumerEndpoint();
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(IN_OPTIONAL_OUT_CONSUMER);
+ io.setOperation(new QName("send"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(provider);
+ assertBeanEndpointRequestsMapEmpty(consumer);
+ }
+
+ public void testConsumerInOptionalOutConsumerReturnsFault() throws Exception {
+ MyInOptionalOutBean bean = new MyInOptionalOutBean();
+ bean.response = new StringSource("<goodbye/>");
+ BeanEndpoint provider = createBeanEndpoint(bean, IN_OPTIONAL_OUT_PRODUCER);
+ component.addEndpoint(provider);
+ BeanEndpoint consumer = createConsumerEndpoint();
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(IN_OPTIONAL_OUT_CONSUMER);
+ io.setOperation(new QName("sendAndFault"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(provider);
+ assertBeanEndpointRequestsMapEmpty(consumer);
+ }
+
+ private BeanEndpoint createConsumerEndpoint() throws Exception {
+ MyConsumerBean bean = new MyConsumerBean();
+ bean.target = IN_OPTIONAL_OUT_PRODUCER;
+ BeanEndpoint endpoint = new BeanEndpoint();
+ endpoint.setBean(bean);
+ endpoint.setService(IN_OPTIONAL_OUT_CONSUMER);
+ endpoint.setEndpoint("endpoint");
+ component.addEndpoint(endpoint);
+ return endpoint;
+ }
+
+ private BeanEndpoint createBeanEndpoint(Object bean, QName service) {
+ BeanEndpoint transformEndpoint = new BeanEndpoint();
+ transformEndpoint.setBean(bean);
+ transformEndpoint.setService(service);
+ transformEndpoint.setEndpoint("endpoint");
+ return transformEndpoint;
+ }
+
+ private BeanEndpoint createBeanEndpoint(Class<?> type, QName service) {
+ BeanEndpoint endpoint = new BeanEndpoint();
+ endpoint.setBeanType(type);
+ endpoint.setService(service);
+ endpoint.setEndpoint("endpoint");
+ return endpoint;
+ }
+
+ public static final class MyInOptionalOutBean implements MessageExchangeListener {
+
+ private Source fault;
+ private Source response;
+
+ @Resource
+ private DeliveryChannel channel;
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange instanceof InOptionalOut) {
+ onInOptionalOut((InOptionalOut) exchange);
+ } else {
+ exchange.setError(new Exception("Only InOptionalOut supported here"));
+ }
+ }
+
+ private void onInOptionalOut(InOptionalOut exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (response != null) {
+ exchange.setOutMessage(exchange.createMessage());
+ exchange.getOutMessage().setContent(response);
+ response = null;
+ } else if (fault != null) {
+ exchange.setFault(exchange.createFault());
+ exchange.getFault().setContent(fault);
+ fault = null;
+ } else {
+ exchange.setStatus(ExchangeStatus.DONE);
+ }
+ channel.send(exchange);
+ }
+ }
+ }
+
+ public static final class MyConsumerBean implements MessageExchangeListener {
+
+ @Resource
+ private DeliveryChannel channel;
+ private QName target;
+ private MessageExchange original;
+ private Source fault;
+
+ public void send() throws MessagingException {
+ InOptionalOut ioo = channel.createExchangeFactory().createInOptionalOutExchange();
+ ioo.setService(target);
+ ioo.setInMessage(ioo.createMessage());
+ ioo.getMessage("in").setContent(new StringSource("<hello/>"));
+ channel.send(ioo);
+ }
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getRole() == Role.PROVIDER) {
+ original = exchange;
+ if (exchange.getOperation().equals(new QName("sendAndFault"))) {
+ fault = new StringSource("<faulted_by_consumer/>");
+ }
+ send();
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (fault != null) {
+ exchange.setFault(exchange.createFault());
+ exchange.getFault().setContent(fault);
+ fault = null;
+ } else {
+ exchange.setStatus(ExchangeStatus.DONE);
+ done();
+ }
+ channel.send(exchange);
+ } else {
+ done();
+ }
+ }
+ }
+
+ private void done() throws MessagingException {
+ original.setStatus(ExchangeStatus.DONE);
+ channel.send(original);
+ }
+ }
+}
Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+
+public class TransformBeanSupportSedaFlowTest extends TransformBeanSupportTest {
+
+ protected void configureContainer() {
+ container.setFlowName("seda");
+ }
+
+}
Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java Tue Feb 17 12:22:51 2009
@@ -16,62 +16,62 @@
*/
package org.apache.servicemix.bean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.xml.namespace.QName;
-import junit.framework.TestCase;
-
-import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.bean.support.ExchangeTarget;
import org.apache.servicemix.bean.support.TransformBeanSupport;
-import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.expression.JAXPXPathExpression;
+import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
import org.apache.servicemix.tck.ReceiverComponent;
+import org.w3c.dom.Element;
-public class TransformBeanSupportTest extends TestCase {
+public class TransformBeanSupportTest extends AbstractBeanComponentTest {
- protected DefaultServiceMixClient client;
- protected JBIContainer container;
- protected ExchangeCompletedListener listener;
- protected BeanComponent component;
+ protected void configureContainer() {
+ container.setFlowName("st");
+ }
+
+ public void testInOut() throws Exception {
+ TransformBeanSupport transformer = new MyTransformer();
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+ component.addEndpoint(transformEndpoint);
- protected void setUp() throws Exception {
- container = new JBIContainer();
- container.setEmbedded(true);
- container.setUseMBeanServer(false);
- container.setCreateMBeanServer(false);
- configureContainer();
- listener = new ExchangeCompletedListener();
- container.addListener(listener);
+ MessageExchange io = client.createInOutExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
- container.init();
- container.start();
-
- component = new BeanComponent();
- container.activateComponent(component, "servicemix-bean");
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+ assertEquals("hello", e.getNodeName());
- client = new DefaultServiceMixClient(container);
- }
-
- protected void tearDown() throws Exception {
- listener.assertExchangeCompleted();
- container.shutDown();
- }
-
- protected void configureContainer() throws Exception {
- container.setFlowName("st");
+ client.done(io);
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
}
- public void testInOut() throws Exception {
+ public void testInOutWithFault() throws Exception {
TransformBeanSupport transformer = new MyTransformer();
BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
component.addEndpoint(transformEndpoint);
@@ -83,11 +83,34 @@
io = client.receive();
assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
- assertEquals("<hello/>", new SourceTransformer().contentToString(io.getMessage("out")));
+ Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+ assertEquals("hello", e.getNodeName());
+
+ client.fail(io, new Exception("We failed to handle the reponse"));
+ assertEquals(ExchangeStatus.ERROR, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+ }
+
+ public void testInOutWithBeanType() throws Exception {
+ BeanEndpoint endpoint = createBeanEndpoint(AssertSameInstancePojo.class);
+ component.addEndpoint(endpoint);
+
+ MessageExchange io = client.createInOutExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+ assertEquals("hello", e.getNodeName());
client.done(io);
assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(endpoint);
}
+
+
public void testInOnly() throws Exception {
TransformBeanSupport transformer = createTransformer("receiver");
@@ -104,6 +127,28 @@
io = client.receive();
assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ }
+
+ public void testInOnlyWithCorrelation() throws Exception {
+ TransformBeanSupport transformer = createTransformer("receiver");
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+ transformEndpoint.setCorrelationExpression(new JAXPXPathExpression("/message/@id"));
+ component.addEndpoint(transformEndpoint);
+
+ ReceiverComponent receiver = new ReceiverComponent();
+ activateComponent(receiver, "receiver");
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<message id='1'/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
receiver.getMessageList().assertMessagesReceived(1);
}
@@ -122,6 +167,26 @@
io = client.receive();
assertEquals(ExchangeStatus.ERROR, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+ }
+
+ public void testInOnlyWithDestination() throws Exception {
+ BeanEndpoint endpoint = createBeanEndpoint(MyDestinationTransformer.class);
+ component.addEndpoint(endpoint);
+
+ ActivationSpec spec = new ActivationSpec(new EchoComponent());
+ spec.setService(new QName("test", "receiver"));
+ spec.setComponentName("receiver");
+ container.activateComponent(spec);
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(endpoint);
}
public void testRobustInOnly() throws Exception {
@@ -139,6 +204,7 @@
io = client.receive();
assertEquals(ExchangeStatus.DONE, io.getStatus());
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
receiver.getMessageList().assertMessagesReceived(1);
}
@@ -159,6 +225,7 @@
assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
assertNotNull(io.getFault());
client.done(io);
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
}
public void testRobustInOnlyWithFaultAndError() throws Exception {
@@ -177,6 +244,7 @@
assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
assertNotNull(io.getFault());
client.fail(io, new Exception("I do not like faults"));
+ assertBeanEndpointRequestsMapEmpty(transformEndpoint);
}
private MyTransformer createTransformer(String targetService) {
@@ -195,6 +263,14 @@
return transformEndpoint;
}
+ private BeanEndpoint createBeanEndpoint(Class<?> type) {
+ BeanEndpoint endpoint = new BeanEndpoint();
+ endpoint.setBeanType(type);
+ endpoint.setService(new QName("transform"));
+ endpoint.setEndpoint("endpoint");
+ return endpoint;
+ }
+
protected void activateComponent(ComponentSupport comp, String name) throws Exception {
comp.setService(new QName(name));
comp.setEndpoint("endpoint");
@@ -207,8 +283,8 @@
return true;
}
}
-
- public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+
+ public static class ReturnErrorComponent extends ComponentSupport implements org.apache.servicemix.MessageExchangeListener {
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -217,7 +293,7 @@
}
}
- public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+ public static class ReturnFaultComponent extends ComponentSupport implements org.apache.servicemix.MessageExchangeListener {
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -228,4 +304,57 @@
}
}
+ public static class AssertSameInstancePojo implements MessageExchangeListener {
+
+ @Resource
+ private DeliveryChannel channel;
+
+ private String id;
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ assertId(exchange);
+ if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+ MessageUtil.enableContentRereadability(exchange.getMessage("in"));
+ MessageUtil.transferInToOut(exchange, exchange);
+ channel.send(exchange);
+ }
+ }
+
+ private void assertId(MessageExchange exchange) {
+ if (exchange.getStatus().equals(ExchangeStatus.ACTIVE)) {
+ id = exchange.getExchangeId();
+ } else {
+ // make sure that the same object is being used to handle the Exchange with status DONE
+ assertEquals(id, exchange.getExchangeId());
+ }
+ }
+ }
+
+ public static class MyDestinationTransformer implements MessageExchangeListener {
+
+ @org.apache.servicemix.bean.ExchangeTarget(uri="service:test:receiver")
+ private Destination receiver;
+
+ @Resource
+ private DeliveryChannel channel;
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE && exchange instanceof InOnly) {
+ NormalizedMessage forward = receiver.createMessage();
+ forward.setContent(exchange.getMessage("in").getContent());
+ Future<NormalizedMessage> response = receiver.send(forward);
+ //let's wait for the response to come back
+ try {
+ response.get();
+ exchange.setStatus(ExchangeStatus.DONE);
+ } catch (InterruptedException e) {
+ exchange.setError(e);
+ } catch (ExecutionException e) {
+ exchange.setError(e);
+ } finally {
+ channel.send(exchange);
+ }
+ }
+ }
+ }
}
Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java Tue Feb 17 12:22:51 2009
@@ -41,45 +41,41 @@
private DeliveryChannel channel;
@PostConstruct
- public void init() {
- senderThread = new Thread(
+ public void init() {
+ senderThread = new Thread(
- new Runnable() {
- public void run() {
- while (keepRunning.get()) {
-
- try {
- String text = "<Hello/>";
- InOnly exchange = channel
- .createExchangeFactoryForService(target)
- .createInOnlyExchange();
- NormalizedMessage msg = exchange.createMessage();
- msg.setContent(new StringSource(text));
+ new Runnable() {
+ public void run() {
+ while (keepRunning.get()) {
+ try {
+ String text = "<Hello/>";
+ InOnly exchange = channel.createExchangeFactoryForService(target).createInOnlyExchange();
+ NormalizedMessage msg = exchange.createMessage();
+ msg.setContent(new StringSource(text));
exchange.setInMessage(msg);
- System.out.println("Sending message: " + text);
- channel.send(exchange);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- });
- senderThread.start();
- }
+ channel.send(exchange);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ });
+ senderThread.start();
+ }
- @PreDestroy
- public void destroy() {
- keepRunning.set(false);
- if (senderThread != null && senderThread.isAlive()) {
- senderThread.interrupt();
- }
- }
+ @PreDestroy
+ public void destroy() {
+ keepRunning.set(false);
+ if (senderThread != null && senderThread.isAlive()) {
+ senderThread.interrupt();
+ }
+ }
public void onMessageExchange(MessageExchange messageExchange) throws MessagingException {
// Do nothing
Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java?rev=744995&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java Tue Feb 17 12:22:51 2009
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.support;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.BeanEndpoint;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link Request}
+ */
+public class RequestTest extends TestCase {
+
+ @Test
+ public void testIsFinishedOnStatus() throws Exception {
+ MessageExchange exchange = createMockExchange("my-exchange-id");
+ Request request = new Request("my-correlation-id", new Object(), exchange);
+ assertFalse(request.isFinished());
+ exchange.setStatus(ExchangeStatus.DONE);
+ assertTrue(request.isFinished());
+ }
+
+ @Test
+ public void testIsFinishedWhenAllExchangesDoneOrError() throws Exception {
+ MessageExchange exchange = createMockExchange("my-exchange-id");
+ Request request = new Request("my-correlation-id", new Object(), exchange);
+ assertFalse(request.isFinished());
+
+ MessageExchange second = createMockExchange("my-second-id");
+ request.addExchange(second);
+ exchange.setStatus(ExchangeStatus.DONE);
+ assertFalse(request.isFinished());
+
+ second.setStatus(ExchangeStatus.ERROR);
+ assertTrue(request.isFinished());
+ }
+
+ public void testAddExchangeSetsCorrelationId() throws Exception {
+ MessageExchange exchange = createMockExchange("my-exchange-id");
+ Request request = new Request("my-correlation-id", new Object(), exchange);
+
+ MessageExchange second = createMockExchange("my-second-id");
+ request.addExchange(second);
+ assertEquals("my-correlation-id", second.getProperty(BeanEndpoint.CORRELATION_ID));
+ }
+
+ public void testNoSentExchangeForCorrelationId() throws Exception {
+ MessageExchange exchange = createMockExchange("my-exchange-id");
+ Request request = new Request("my-correlation-id", new Object(), exchange);
+ request.addExchange(exchange);
+ assertEquals("We shouldn't have duplicate MessageExchange instances", 1, request.getExchanges().size());
+ }
+
+ private MessageExchange createMockExchange(String id) {
+ MockMessageExchange exchange = new MockMessageExchange();
+ exchange.setExchangeId(id);
+ exchange.setStatus(ExchangeStatus.ACTIVE);
+ return exchange;
+ }
+}
Propchange: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties?rev=744995&r1=744994&r2=744995&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/resources/log4j.properties Tue Feb 17 12:22:51 2009
@@ -21,7 +21,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=DEBUG, out
log4j.logger.org.springframework=INFO
log4j.logger.org.apache.activemq=INFO