You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/01/11 20:19:58 UTC

svn commit: r733498 - in /servicemix/smx3/branches/servicemix-3.2: common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ core/servicemix-core/src/main/java/org/apache/servicemix/components/util/ core/servicemix-core/src/te...

Author: ccustine
Date: Sun Jan 11 11:19:58 2009
New Revision: 733498

URL: http://svn.apache.org/viewvc?rev=733498&view=rev
Log:
SM-1757 TransformComponentSupport does not handle errors nor does it support robust-in-only MEPs

Modified:
    servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java

Modified: servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java Sun Jan 11 11:19:58 2009
@@ -22,8 +22,12 @@
 import java.util.Map;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 import javax.xml.transform.stream.StreamSource;
@@ -32,10 +36,13 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.jbi.container.SpringJBIContainer;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.resolver.EndpointResolver;
+import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.tck.Receiver;
 import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
 import org.springframework.context.support.AbstractXmlApplicationContext;
@@ -47,6 +54,7 @@
     private static transient Log log = LogFactory.getLog(ServiceMixClientTest.class);
 
     protected AbstractXmlApplicationContext context;
+    protected SpringJBIContainer container;
     protected ServiceMixClient client;
     protected Receiver receiver;
 
@@ -70,6 +78,44 @@
         receiver.getMessageList().assertMessagesReceived(1);
     }
 
+    public void testSendWithErrorUsingJbiAPIs() throws Exception {
+
+        MessageExchange exchange = client.createInOnlyExchange();
+
+        NormalizedMessage message = exchange.getMessage("in");
+        message.setProperty("name", "James");
+        message.setContent(new StreamSource(new StringReader("<hello>world</hello>")));
+
+        activateComponent(new ReturnErrorComponent(), "error");
+        
+        QName service = new QName("error");
+        exchange.setService(service);
+        client.send(exchange);
+        
+        exchange = client.receive();
+        assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
+    }
+
+    public void testSendWithFaultUsingJbiAPIs() throws Exception {
+
+        MessageExchange exchange = client.createRobustInOnlyExchange();
+
+        NormalizedMessage message = exchange.getMessage("in");
+        message.setProperty("name", "James");
+        message.setContent(new StreamSource(new StringReader("<hello>world</hello>")));
+
+        activateComponent(new ReturnFaultComponent(), "fault");
+        
+        QName service = new QName("fault");
+        exchange.setService(service);
+        client.send(exchange);
+        
+        exchange = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, exchange.getStatus());
+        assertNotNull(exchange.getFault());
+        client.done(exchange);
+    }
+
     public void testSendUsingMapAndPOJOsByServiceName() throws Exception {
 
         Map properties = new HashMap();
@@ -188,8 +234,8 @@
         // TODO
         //receiver = (Receiver) getBean("receiver");
 
-        SpringJBIContainer jbi = (SpringJBIContainer) getBean("jbi");
-        receiver = (Receiver) jbi.getBean("receiver");
+        container = (SpringJBIContainer) getBean("jbi");
+        receiver = (Receiver) container.getBean("receiver");
         assertNotNull("receiver not found in JBI container", receiver);
     }
 
@@ -211,4 +257,31 @@
         return new ClassPathXmlApplicationContext("org/apache/servicemix/components/groovy/example.xml");
 
     }
+
+    protected void activateComponent(ComponentSupport comp, String name) throws Exception {
+        comp.setService(new QName(name));
+        comp.setEndpoint("endpoint");
+        container.activateComponent(comp, name);
+    }
+    
+    public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, new Exception());
+            }
+        }
+    }
+
+    public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+        
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                Fault fault = exchange.createFault();
+                fault.setContent(new StringSource("<fault/>"));
+                fail(exchange, fault);
+            }
+        }
+    }
+    
 }

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java Sun Jan 11 11:19:58 2009
@@ -16,8 +16,12 @@
  */
 package org.apache.servicemix.components.util;
 
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jbi.JBIException;
 import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
@@ -25,6 +29,10 @@
 
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.jbi.util.MessageUtil;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
 
 /**
  * A useful base class for a transform component.
@@ -33,8 +41,12 @@
  */
 public abstract class TransformComponentSupport extends ComponentSupport implements MessageExchangeListener {
     
+    private String correlation;
+    
     private boolean copyProperties = true;
     private boolean copyAttachments = true;
+    private StoreFactory storeFactory;
+    private Store store;
 
     protected TransformComponentSupport() {
     }
@@ -43,23 +55,30 @@
         super(service, endpoint);
     }
 
-    public void onMessageExchange(MessageExchange exchange) {
-        // Skip done exchanges
-        if (exchange.getStatus() == ExchangeStatus.DONE) {
-            return;
-        // Handle error exchanges
-        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-            return;
+    public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+        // Handle consumer exchanges && non-active RobustInOnly provider exchanges
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER
+                || exchange.getProperty(correlation) != null) {
+            processOngoingExchange(exchange);
+        } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            processFirstExchange(exchange);
         }
+    }
+
+    protected void processFirstExchange(MessageExchange exchange) {
         try {
-            InOnly outExchange = null;
+            MessageExchange outExchange = null;
             NormalizedMessage in = getInMessage(exchange);
             NormalizedMessage out;
             if (isInAndOut(exchange)) {
                 out = exchange.createMessage();
             } else {
-                outExchange = getExchangeFactory().createInOnlyExchange();
+                URI pattern = exchange.getPattern();
+                outExchange = getExchangeFactory().createExchange(pattern);
                 outExchange.setProperty(JbiConstants.SENDER_ENDPOINT, getService() + ":" + getEndpoint());
+                // Set correlations
+                outExchange.setProperty(correlation, exchange.getExchangeId());
+                exchange.setProperty(correlation, outExchange.getExchangeId());
                 String processCorrelationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
                 if (processCorrelationId != null) {
                     outExchange.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
@@ -72,23 +91,41 @@
                 if (isInAndOut(exchange)) {
                     exchange.setMessage(out, "out");
                     if (txSync) {
-                        getDeliveryChannel().sendSync(exchange);
+                        sendSync(exchange);
                     } else {
-                        getDeliveryChannel().send(exchange);
+                        send(exchange);
                     }
                 } else {
                     outExchange.setMessage(out, "in");
                     if (txSync) {
-                        getDeliveryChannel().sendSync(outExchange);
+                        sendSync(outExchange);
+                        if (outExchange.getStatus() == ExchangeStatus.DONE) {
+                            done(exchange);
+                        } else if (outExchange.getStatus() == ExchangeStatus.ERROR) {
+                            fail(exchange, outExchange.getError());
+                        } else if (outExchange.getFault() != null) {
+                            Fault fault = MessageUtil.copyFault(outExchange);
+                            done(outExchange);
+                            MessageUtil.transferToFault(fault, exchange);
+                            sendSync(exchange);
+                        } else {
+                            done(outExchange);
+                            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+                                    + " but has no Out nor Fault message");
+                        }
                     } else {
-                        getDeliveryChannel().send(outExchange);
+                        store.store(exchange.getExchangeId(), exchange);
+                        try {
+                            send(outExchange);
+                        } catch (Exception e) {
+                            store.load(exchange.getExchangeId());
+                            throw e;
+                        }
                     }
-                    exchange.setStatus(ExchangeStatus.DONE);
-                    getDeliveryChannel().send(exchange);
                 }
             } else {
                 exchange.setStatus(ExchangeStatus.DONE);
-                getDeliveryChannel().send(exchange);
+                send(exchange);
             }
         } catch (Exception e) {
             try {
@@ -102,15 +139,79 @@
         }
     }
 
+    protected void processOngoingExchange(MessageExchange exchange) {
+        MessageExchange original = null;
+        String id = null;
+        try {
+            id = (String) exchange.getProperty(correlation);
+            original = (MessageExchange) store.load(id);
+        } catch (Exception e) {
+            // We can't do, so just return
+            return;
+        }
+        try {
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                done(original);
+            // Reproduce ERROR status to the other side
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                fail(original, exchange.getError());
+            // Reproduce faults to the other side and listeners
+            } else if (exchange.getFault() != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                try {
+                    MessageUtil.transferTo(exchange, original, "fault");
+                    send(original);
+                } catch (Exception e) {
+                    store.load(exchange.getExchangeId());
+                    throw e;
+                }
+            // Reproduce answers to the other side
+            } else if (exchange.getMessage("out") != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                try {
+                    MessageUtil.transferTo(exchange, original, "out");
+                    send(original);
+                } catch (Exception e) {
+                    store.load(exchange.getExchangeId());
+                    throw e;
+                }
+            } else {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+                        + " but has no Out nor Fault message");
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Original error: " + e, e);
+            }
+        }
+    }
+    
 
     // Implementation methods
     //-------------------------------------------------------------------------
 
+    
     /**
      * Transforms the given out message
      */
     protected abstract boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception;
 
+    @Override
+    protected void init() throws JBIException {
+        super.init();
+        if (store == null) {
+            if (storeFactory == null) {
+                storeFactory = new MemoryStoreFactory();
+            }
+            try {
+                store = storeFactory.open(getService().toString() + getEndpoint());
+            } catch (IOException e) {
+                throw new JBIException("Unable to open storeFactory" + e.getMessage(), e);
+            }
+        }
+        correlation = "TransformComponentSupport.Correlation." + getService() + "." + getEndpoint();
+        
+    }
 
     public boolean isCopyProperties() {
         return copyProperties;
@@ -150,4 +251,21 @@
             CopyTransformer.copyAttachments(in, out);
         }
     }
+    
+    public StoreFactory getStoreFactory() {
+        return storeFactory;
+    }
+
+    public void setStoreFactory(StoreFactory storeFactory) {
+        this.storeFactory = storeFactory;
+    }
+
+    public Store getStore() {
+        return store;
+    }
+
+    public void setStore(Store store) {
+        this.store = store;
+    }
+    
 }

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java Sun Jan 11 11:19:58 2009
@@ -36,6 +36,7 @@
         container.start();
         
         EchoComponent component = new EchoComponent();
+        component.setService(new QName("http://foo.bar.com", "myService"));
         container.activateComponent(component, "component");
         ServiceEndpoint ep = component.getContext().activateEndpoint(new QName("http://foo.bar.com", "myService"), "myEndpoint");
         DocumentFragment epr = ep.getAsReference(null);
@@ -50,6 +51,7 @@
         container.start();
         
         EchoComponent component = new EchoComponent();
+        component.setService(new QName("http://foo.bar.com", "myService"));
         container.activateComponent(component, "component");
         ServiceEndpoint ep = component.getContext().activateEndpoint(new QName("http://foo.bar.com", "myService"), "myEndpoint");
         DocumentFragment epr = URIResolver.createWSAEPR("endpoint:http://foo.bar.com/myService/myEndpoint");