You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/10/20 11:48:00 UTC
svn commit: r586692 - in
/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src:
main/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/
test/java/org/apache/servicemix/bean/beans/
Author: gnodet
Date: Sat Oct 20 02:48:00 2007
New Revision: 586692
URL: http://svn.apache.org/viewvc?rev=586692&view=rev
Log:
SM-1110: ServiceMix is not sending a response back to the calling Service in an In-Out Message Exchange
Added:
incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
Modified:
incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=586692&r1=586691&r2=586692&view=diff
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Sat Oct 20 02:48:00 2007
@@ -19,21 +19,32 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;
+import java.util.MissingResourceException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
+import javax.jbi.JBIException;
import javax.jbi.component.ComponentContext;
+import javax.jbi.management.MBeanNames;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.MBeanServer;
+import javax.naming.InitialContext;
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.DocumentFragment;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.jexl.Expression;
@@ -75,7 +86,7 @@
private String beanClassName;
private MethodInvocationStrategy methodInvocationStrategy;
private org.apache.servicemix.expression.Expression correlationExpression;
-
+
private Map<String, Holder> exchanges = new ConcurrentHashMap<String, Holder>();
private Map<Object, Request> requests = new ConcurrentHashMap<Object, Request>();
private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
@@ -202,7 +213,7 @@
throw new IllegalStateException("Unknown role: " + exchange.getRole());
}
}
-
+
protected void onProviderExchange(MessageExchange exchange) throws Exception {
Object corId = getCorrelation(exchange);
Request req = requests.get(corId);
@@ -259,7 +270,7 @@
currentRequest.set(null);
}
}
-
+
protected void onConsumerExchange(MessageExchange exchange) throws Exception {
Object corId = exchange.getExchangeId();
Request req = requests.remove(corId);
@@ -282,7 +293,7 @@
checkEndOfRequest(req, corId);
currentRequest.set(null);
}
-
+
protected Object getCorrelation(MessageExchange exchange) throws MessagingException {
return getCorrelationExpression().evaluate(exchange, exchange.getMessage("in"));
}
@@ -321,6 +332,8 @@
* @param target the bean to be injected
*/
protected void injectBean(final Object target) {
+ final PojoContext ctx = new PojoContext();
+ final DeliveryChannel ch = ctx.channel;
// Inject fields
ReflectionUtils.doWithFields(target.getClass(), new ReflectionUtils.FieldCallback() {
public void doWith(Field f) throws IllegalArgumentException, IllegalAccessException {
@@ -330,9 +343,9 @@
}
if (f.getAnnotation(Resource.class) != null) {
if (ComponentContext.class.isAssignableFrom(f.getType())) {
- ReflectionUtils.setField(f, target, context);
+ ReflectionUtils.setField(f, target, ctx);
} else if (DeliveryChannel.class.isAssignableFrom(f.getType())) {
- ReflectionUtils.setField(f, target, channel);
+ ReflectionUtils.setField(f, target, ch);
}
}
}
@@ -368,7 +381,7 @@
}
});
}
-
+
/**
* Used by POJOs acting as a consumer
* @param uri
@@ -380,7 +393,7 @@
InOut me = getExchangeFactory().createInOutExchange();
URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(), uri);
MessageUtil.transferTo(message, me, "in");
- final Holder h = new Holder();
+ final Holder h = new Holder();
requests.put(me.getExchangeId(), currentRequest.get());
exchanges.put(me.getExchangeId(), h);
BeanEndpoint.this.send(me);
@@ -389,7 +402,7 @@
throw new RuntimeException(e);
}
}
-
+
protected void checkEndOfRequest(Request request, Object corId) {
if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
ReflectionUtils.callLifecycleMethod(request.getBean(), PreDestroy.class);
@@ -430,5 +443,138 @@
*/
public void setCorrelationExpression(org.apache.servicemix.expression.Expression correlationExpression) {
this.correlationExpression = correlationExpression;
+ }
+
+ protected class PojoContext implements ComponentContext {
+
+ private DeliveryChannel channel = new PojoChannel();
+
+ public ServiceEndpoint activateEndpoint(QName qName, String s) throws JBIException {
+ return context.activateEndpoint(qName, s);
+ }
+
+ public void deactivateEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+ context.deactivateEndpoint(serviceEndpoint);
+ }
+
+ public void registerExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+ context.registerExternalEndpoint(serviceEndpoint);
+ }
+
+ public void deregisterExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException {
+ context.deregisterExternalEndpoint(serviceEndpoint);
+ }
+
+ public ServiceEndpoint resolveEndpointReference(DocumentFragment documentFragment) {
+ return context.resolveEndpointReference(documentFragment);
+ }
+
+ public String getComponentName() {
+ return context.getComponentName();
+ }
+
+ public DeliveryChannel getDeliveryChannel() throws MessagingException {
+ return channel;
+ }
+
+ public ServiceEndpoint getEndpoint(QName qName, String s) {
+ return context.getEndpoint(qName, s);
+ }
+
+ public Document getEndpointDescriptor(ServiceEndpoint serviceEndpoint) throws JBIException {
+ return context.getEndpointDescriptor(serviceEndpoint);
+ }
+
+ public ServiceEndpoint[] getEndpoints(QName qName) {
+ return context.getEndpoints(qName);
+ }
+
+ public ServiceEndpoint[] getEndpointsForService(QName qName) {
+ return context.getEndpointsForService(qName);
+ }
+
+ public ServiceEndpoint[] getExternalEndpoints(QName qName) {
+ return context.getExternalEndpoints(qName);
+ }
+
+ public ServiceEndpoint[] getExternalEndpointsForService(QName qName) {
+ return context.getExternalEndpointsForService(qName);
+ }
+
+ public String getInstallRoot() {
+ return context.getInstallRoot();
+ }
+
+ public Logger getLogger(String s, String s1) throws MissingResourceException, JBIException {
+ return context.getLogger(s, s1);
+ }
+
+ public MBeanNames getMBeanNames() {
+ return context.getMBeanNames();
+ }
+
+ public MBeanServer getMBeanServer() {
+ return context.getMBeanServer();
+ }
+
+ public InitialContext getNamingContext() {
+ return context.getNamingContext();
+ }
+
+ public Object getTransactionManager() {
+ return context.getTransactionManager();
+ }
+
+ public String getWorkspaceRoot() {
+ return context.getWorkspaceRoot();
+ }
+ }
+
+ protected class PojoChannel implements DeliveryChannel {
+
+ public void close() throws MessagingException {
+ BeanEndpoint.this.channel.close();
+ }
+
+ public MessageExchangeFactory createExchangeFactory() {
+ return BeanEndpoint.this.channel.createExchangeFactory();
+ }
+
+ public MessageExchangeFactory createExchangeFactory(QName qName) {
+ return BeanEndpoint.this.channel.createExchangeFactory(qName);
+ }
+
+ public MessageExchangeFactory createExchangeFactoryForService(QName qName) {
+ return BeanEndpoint.this.channel.createExchangeFactoryForService(qName);
+ }
+
+ public MessageExchangeFactory createExchangeFactory(ServiceEndpoint serviceEndpoint) {
+ return BeanEndpoint.this.channel.createExchangeFactory(serviceEndpoint);
+ }
+
+ public MessageExchange accept() throws MessagingException {
+ return BeanEndpoint.this.channel.accept();
+ }
+
+ public MessageExchange accept(long l) throws MessagingException {
+ return BeanEndpoint.this.channel.accept(l);
+ }
+
+ public void send(MessageExchange messageExchange) throws MessagingException {
+ if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
+ && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ requests.put(messageExchange.getExchangeId(), currentRequest.get());
+ }
+ BeanEndpoint.this.channel.send(messageExchange);
+ }
+
+ public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
+ return BeanEndpoint.this.channel.sendSync(messageExchange);
+ }
+
+ public boolean sendSync(MessageExchange messageExchange, long l) throws MessagingException {
+ return BeanEndpoint.this.channel.sendSync(messageExchange, l);
+ }
+
}
}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java?rev=586692&view=auto
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java Sat Oct 20 02:48:00 2007
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.beans.ConsumerListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+public class ConsumerListenerTest extends TestCase {
+
+ protected JBIContainer jbi;
+
+ protected void setUp() throws Exception {
+ jbi = new JBIContainer();
+ jbi.setEmbedded(true);
+ jbi.init();
+ }
+
+ public void test() throws Exception {
+ BeanComponent bc = new BeanComponent();
+ BeanEndpoint ep = new BeanEndpoint();
+ ep.setService(new QName("bean"));
+ ep.setEndpoint("endpoint");
+ ep.setBeanType(ConsumerListener.class);
+ bc.setEndpoints(new BeanEndpoint[] {ep });
+ jbi.activateComponent(bc, "servicemix-bean");
+
+ EchoComponent echo = new EchoComponent(new QName("echo"), "endpoint");
+ jbi.activateComponent(echo, "echo");
+
+ jbi.start();
+
+ ServiceMixClient client = new DefaultServiceMixClient(jbi);
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("bean"));
+ me.setOperation(new QName("receive"));
+ NormalizedMessage nm = me.getInMessage();
+ nm.setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(me);
+ assertExchangeWorked(me);
+ client.done(me);
+ }
+
+ protected void assertExchangeWorked(MessageExchange me) throws Exception {
+ if (me.getStatus() == ExchangeStatus.ERROR) {
+ if (me.getError() != null) {
+ throw me.getError();
+ } else {
+ fail("Received ERROR status");
+ }
+ } else if (me.getFault() != null) {
+ fail("Received fault: " + new SourceTransformer().toString(me.getFault().getContent()));
+ }
+ }
+
+}
\ No newline at end of file
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java?rev=586692&r1=586691&r2=586692&view=diff
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java Sat Oct 20 02:48:00 2007
@@ -18,10 +18,12 @@
import javax.annotation.Resource;
import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.jbi.util.MessageUtil;
@@ -32,14 +34,28 @@
private DeliveryChannel channel;
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- MessageExchangeFactory factory = channel.createExchangeFactory();
- InOut io = factory.createInOutExchange();
- try {
- MessageUtil.transferInToIn(exchange, io);
- } catch (MessagingException e) {
- throw e;
- } catch (Exception e) {
- throw new MessagingException(e);
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+ MessageUtil.transferOutToOut(exchange, io);
+ io.setProperty("exchange", exchange);
+ channel.send(io);
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+ io.setStatus(ExchangeStatus.DONE);
+ channel.send(io);
+ }
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchangeFactory factory = channel.createExchangeFactory();
+ InOut io = factory.createInOutExchange();
+ MessageUtil.transferInToIn(exchange, io);
+ io.setService(new QName("echo"));
+ io.setProperty("exchange", exchange);
+ channel.send(io);
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // Do nothing
+ }
}
}