You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/14 10:01:33 UTC

svn commit: r713955 - in /servicemix/components/engines/servicemix-bean/trunk: pom.xml src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java

Author: gnodet
Date: Fri Nov 14 01:01:32 2008
New Revision: 713955

URL: http://svn.apache.org/viewvc?rev=713955&view=rev
Log:
SM-1668, SM-1604: Fix TransformBeanSupport processing of faults and RobustInOnly meps

Modified:
    servicemix/components/engines/servicemix-bean/trunk/pom.xml
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java

Modified: servicemix/components/engines/servicemix-bean/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/pom.xml?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-bean/trunk/pom.xml Fri Nov 14 01:01:32 2008
@@ -36,7 +36,7 @@
 
   <properties>
     <previous.releases>3.1.2,3.2,3.2.1</previous.releases>
-    <servicemix-version>3.2.1</servicemix-version>
+    <servicemix-version>3.3</servicemix-version>
     <servicemix-shared-version>2008.02-SNAPSHOT</servicemix-shared-version>
     <servicemix.osgi.import>
       org.apache.commons.jexl*;resolution:=optional,

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java Fri Nov 14 01:01:32 2008
@@ -16,18 +16,21 @@
  */
 package org.apache.servicemix.bean.support;
 
+import java.net.URI;
+
+import javax.annotation.PostConstruct;
 import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
-import javax.annotation.PostConstruct;
 
 import org.apache.servicemix.common.JbiConstants;
+import org.apache.servicemix.common.util.MessageUtil;
 import org.apache.servicemix.jbi.listener.MessageExchangeListener;
 import org.apache.servicemix.jbi.transformer.CopyTransformer;
-import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
 
 /**
@@ -37,6 +40,8 @@
  */
 public abstract class TransformBeanSupport extends BeanSupport implements MessageExchangeListener {
     
+    private String correlation;
+    
     private ExchangeTarget target;
 
     private boolean copyProperties = true;
@@ -110,25 +115,60 @@
             }
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
+        correlation = "TransformBeanSupport.Correlation." + getService() + "." + getEndpoint();
     }
 
     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-        // Handle consumer exchanges
-        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+        // Handle consumer exchanges && non-active RobustInOnly provider exchanges
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER
+            || exchange.getProperty(correlation) != null) {
             MessageExchange original = null;
+            String id = null;
             try {
-                original = (MessageExchange) store.load(exchange.getExchangeId());
+                id = (String) exchange.getProperty(correlation);
+                original = (MessageExchange) store.load(id);
             } catch (Exception e) {
                 // We can't do, so just return
                 return;
             }
-            if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                original.setStatus(ExchangeStatus.ERROR);
-                original.setError(exchange.getError());
-                send(original);
+            try {
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    done(original);
+                // Reproduce ERROR status to the other side
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    fail(original, exchange.getError());
+                // Reproduce faults to the other side and listeners
+                } else if (exchange.getFault() != null) {
+                    store.store(exchange.getExchangeId(), exchange);
+                    try {
+                        MessageUtil.transferTo(exchange, original, "fault"); 
+                        send(original);
+                    } catch (Exception e) {
+                        store.load(exchange.getExchangeId());
+                        throw e;
+                    }
+                // Reproduce answers to the other side
+                } else if (exchange.getMessage("out") != null) {
+                    store.store(exchange.getExchangeId(), exchange);
+                    try {
+                        MessageUtil.transferTo(exchange, original, "out"); 
+                        send(original);
+                    } catch (Exception e) {
+                        store.load(exchange.getExchangeId());
+                        throw e;
+                    }
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+                            + " but has no Out nor Fault message");
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Original error: " + e, e);
+                }
             }
             return;
         }
+        
         // Skip done exchanges
         if (exchange.getStatus() == ExchangeStatus.DONE) {
             return;
@@ -137,18 +177,22 @@
             return;
         }
         try {
-            InOnly outExchange = null;
+            MessageExchange outExchange = null;
             NormalizedMessage in = getInMessage(exchange);
             NormalizedMessage out;
             if (isInAndOut(exchange)) {
                 out = exchange.createMessage();
             } else {
+                URI pattern = exchange.getPattern();
                 if (target == null) {
-                    throw new IllegalStateException("An IN-ONLY TransformBean has no Target specified");
+                    throw new IllegalStateException("A TransformBean with MEP " + pattern + " has no Target specified");
                 }
-                outExchange = getExchangeFactory().createInOnlyExchange();
+                outExchange = getExchangeFactory().createExchange(pattern);
                 target.configureTarget(outExchange, getContext());
                 outExchange.setProperty(JbiConstants.SENDER_ENDPOINT, getService() + ":" + getEndpoint());
+                // Set correlations
+                outExchange.setProperty(correlation, exchange.getExchangeId());
+                exchange.setProperty(correlation, outExchange.getExchangeId());
                 String processCorrelationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
                 if (processCorrelationId != null) {
                     outExchange.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
@@ -169,17 +213,28 @@
                     outExchange.setMessage(out, "in");
                     if (txSync) {
                         sendSync(outExchange);
-                        if (outExchange.getStatus() == ExchangeStatus.ERROR) {
-                            exchange.setStatus(ExchangeStatus.ERROR);
-                            exchange.setError(outExchange.getError());
-                            send(exchange);
+                        if (outExchange.getStatus() == ExchangeStatus.DONE) {
+                            done(exchange);
+                        } else if (outExchange.getStatus() == ExchangeStatus.ERROR) {
+                            fail(exchange, outExchange.getError());
+                        } else if (outExchange.getFault() != null) {
+                            Fault fault = MessageUtil.copyFault(outExchange);
+                            done(outExchange);
+                            MessageUtil.transferToFault(fault, exchange);
+                            sendSync(exchange);
                         } else {
-                            exchange.setStatus(ExchangeStatus.DONE);
-                            send(exchange);
+                            done(outExchange);
+                            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+                                    + " but has no Out nor Fault message");
                         }
                     } else {
-                        store.store(outExchange.getExchangeId(), exchange);
-                        send(outExchange);
+                        store.store(exchange.getExchangeId(), exchange);
+                        try {
+                            send(outExchange);
+                        } catch (Exception e) {
+                            store.load(exchange.getExchangeId());
+                            throw e;
+                        }
                     }
                 }
             } else {

Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java Fri Nov 14 01:01:32 2008
@@ -16,72 +16,157 @@
  */
 package org.apache.servicemix.bean;
 
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.component.Component;
+import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
 import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.support.ExchangeTarget;
 import org.apache.servicemix.bean.support.TransformBeanSupport;
-import org.apache.servicemix.bean.pojos.LoggingPojo;
+import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.common.util.MessageUtil;
+import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.jbi.container.JBIContainer;
-import org.apache.servicemix.jbi.listener.MessageExchangeListener;
 import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.components.util.EchoComponent;
-import org.apache.servicemix.components.util.TransformComponentSupport;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.bean.support.ExchangeTarget;
-import org.apache.servicemix.client.ServiceMixClient;
-import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.tck.ReceiverComponent;
 
 public class TransformBeanSupportTest extends TestCase {
 
+    protected DefaultServiceMixClient client;
     protected JBIContainer container;
+    protected ExchangeCompletedListener listener;
     protected BeanComponent component;
 
     protected void setUp() throws Exception {
         container = new JBIContainer();
         container.setEmbedded(true);
+        container.setUseMBeanServer(false);
+        container.setCreateMBeanServer(false);
+        configureContainer();
+        listener = new ExchangeCompletedListener();
+        container.addListener(listener);
+
         container.init();
+        container.start();
 
         component = new BeanComponent();
         container.activateComponent(component, "servicemix-bean");
 
-        container.start();
+        client = new DefaultServiceMixClient(container);
     }
 
     protected void tearDown() throws Exception {
         container.shutDown();
+        listener.assertExchangeCompleted();
     }
 
-    public void testInOnlyWithError() throws Exception {
-        MyTransformer transformer = new MyTransformer();
-        ExchangeTarget target = new ExchangeTarget();
-        target.setService(new QName("error"));
-        transformer.setTarget(target);
-        BeanEndpoint transformEndpoint = new BeanEndpoint();
-        transformEndpoint.setBean(transformer);
-        transformEndpoint.setService(new QName("transform"));
-        transformEndpoint.setEndpoint("endpoint");
+    protected void configureContainer() throws Exception {
+        container.setFlowName("st");
+    }
+
+    ReceiverComponent receiver = new ReceiverComponent();
+
+    public void testInOnly() throws Exception {
+        TransformBeanSupport transformer = createTransformer("receiver");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
         component.addEndpoint(transformEndpoint);
 
-        SendErrorComponent sendErrorComponent = new SendErrorComponent();
-        container.activateComponent(sendErrorComponent, "error");
+        ReceiverComponent receiver = new ReceiverComponent();
+        activateComponent(receiver, "receiver");
 
-        ServiceMixClient client = new DefaultServiceMixClient(container);
         MessageExchange io = client.createInOnlyExchange();
         io.setService(new QName("transform"));
         io.getMessage("in").setContent(new StringSource("<hello/>"));
         client.send(io);
+
         io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
 
+
+    public void testInOnlyWithError() throws Exception {
+        TransformBeanSupport transformer = createTransformer("error");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        component.addEndpoint(transformEndpoint);
+
+        activateComponent(new ReturnErrorComponent(), "error");
+
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+
+        io = client.receive();
         assertEquals(ExchangeStatus.ERROR, io.getStatus());
     }
 
+    public void testRobustInOnly() throws Exception {
+        TransformBeanSupport transformer = createTransformer("receiver");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        component.addEndpoint(transformEndpoint);
+
+        ReceiverComponent receiver = new ReceiverComponent();
+        activateComponent(receiver, "receiver");
+
+        MessageExchange io = client.createRobustInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
+
+    public void testRobustInOnlyWithFaultAndError() throws Exception {
+        TransformBeanSupport transformer = createTransformer("fault");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        component.addEndpoint(transformEndpoint);
+
+        activateComponent(new ReturnFaultComponent(), "fault");
+
+        MessageExchange io = client.createRobustInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        assertNotNull(io.getFault());
+        client.fail(io, new Exception("I do not like faults"));
+    }
+
+    private MyTransformer createTransformer(String targetService) {
+        MyTransformer transformer = new MyTransformer();
+        ExchangeTarget target = new ExchangeTarget();
+        target.setService(new QName(targetService));
+        transformer.setTarget(target);
+        return transformer;
+    }
+
+    private BeanEndpoint createBeanEndpoint(TransformBeanSupport transformer) {
+        BeanEndpoint transformEndpoint = new BeanEndpoint();
+        transformEndpoint.setBean(transformer);
+        transformEndpoint.setService(new QName("transform"));
+        transformEndpoint.setEndpoint("endpoint");
+        return transformEndpoint;
+    }
+
+    protected void activateComponent(ComponentSupport component, String name) throws Exception {
+        component.setService(new QName(name));
+        component.setEndpoint("endpoint");
+        container.activateComponent(component, name);
+    }
+
     public static class MyTransformer extends TransformBeanSupport {
         protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception {
             MessageUtil.transfer(in, out);
@@ -89,15 +174,23 @@
         }
     }
 
-    public static class SendErrorComponent extends ComponentSupport implements MessageExchangeListener {
-        public SendErrorComponent() {
-            setService(new QName("error"));
+    public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+        	if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+	            fail(exchange, new Exception());
+        	}
         }
+    }
+
+    public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
 
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-            exchange.setStatus(ExchangeStatus.ERROR);
-            exchange.setError(new Exception());
-            send(exchange);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                Fault fault = exchange.createFault();
+                fault.setContent(new StringSource("<fault/>"));
+                fail(exchange, fault);
+            }
         }
     }