You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/05 12:57:20 UTC

svn commit: r411741 [1/2] - in /incubator/servicemix/trunk/servicemix-eip: ./ src/main/java/org/apache/servicemix/eip/ src/main/java/org/apache/servicemix/eip/patterns/ src/main/java/org/apache/servicemix/eip/support/ src/test/java/org/apache/servicemi...

Author: gnodet
Date: Mon Jun  5 03:57:19 2006
New Revision: 411741

URL: http://svn.apache.org/viewvc?rev=411741&view=rev
Log:
Support xa transactions on seda flow (using sendSync)

Added:
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java
Modified:
    incubator/servicemix/trunk/servicemix-eip/pom.xml
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java

Modified: incubator/servicemix/trunk/servicemix-eip/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/pom.xml?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-eip/pom.xml Mon Jun  5 03:57:19 2006
@@ -53,6 +53,21 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>incubator-activemq</groupId>
+      <artifactId>activemq-ra</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>tranql</groupId>
+      <artifactId>tranql-connector</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java Mon Jun  5 03:57:19 2006
@@ -25,6 +25,7 @@
 import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.servicedesc.ServiceEndpoint;
 
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.common.BaseLifeCycle;
 import org.apache.servicemix.common.Endpoint;
 import org.apache.servicemix.common.ExchangeProcessor;
@@ -188,6 +189,12 @@
         }
     }
     
+    protected void sendSync(MessageExchange me) throws MessagingException {
+        if (!channel.sendSync(me)) {
+            throw new MessagingException("SendSync failed");
+        }
+    }
+    
     protected void done(MessageExchange me) throws MessagingException {
         me.setStatus(ExchangeStatus.DONE);
         send(me);
@@ -203,5 +210,21 @@
     
     public void stop() {
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws Exception {
+        boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+        if (txSync && exchange.getRole() == Role.PROVIDER && exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            processSync(exchange);
+        } else {
+            processAsync(exchange);
+        }
+    }
+    
+    protected abstract void processAsync(MessageExchange exchange) throws Exception;
+
+    protected abstract void processSync(MessageExchange exchange) throws Exception;
 
 }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java Mon Jun  5 03:57:19 2006
@@ -17,12 +17,11 @@
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
@@ -43,9 +42,6 @@
  */
 public class ContentBasedRouter extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(WireTap.class);
-    
-    
     /**
      * Routing rules that are evaluated to find the target destination
      */
@@ -83,69 +79,97 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processSync(MessageExchange exchange) throws Exception {
+        // Create exchange for target
+        MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+        // Now copy input to new exchange
+        // We need to read the message once for finding routing target
+        // so ensure we have a re-readable source
+        NormalizedMessage in = MessageUtil.copyIn(exchange);
+        MessageUtil.transferToIn(in, tme); 
+        // Retrieve target
+        ExchangeTarget target = getDestination(tme);
+        target.configureTarget(tme, getContext());
+        // Send in to target
+        sendSync(tme);
+        // Send back the result
+        if (tme.getStatus() == ExchangeStatus.DONE) {
+            done(exchange);
+        } else if (tme.getStatus() == ExchangeStatus.ERROR) {
+            fail(exchange, tme.getError());
+        } else if (tme.getFault() != null) {
+            Fault fault = MessageUtil.copyFault(tme);
+            done(tme);
+            MessageUtil.transferToFault(fault, exchange);
+            sendSync(exchange);
+        } else if (tme.getMessage("out") != null) {
+            NormalizedMessage out = MessageUtil.copyOut(tme);
+            done(tme);
+            MessageUtil.transferToOut(out, exchange);
+            sendSync(exchange);
+        } else {
+            done(tme);
+            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
-                exchange.getProperty(correlation) == null) {
-                // Create exchange for target
-                MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
-                if (store.hasFeature(Store.CLUSTERED)) {
-                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
-                    tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
-                }
-                // Set correlations
-                tme.setProperty(correlation, exchange.getExchangeId());
-                exchange.setProperty(correlation, tme.getExchangeId());
-                // Put exchange to store
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+            exchange.getProperty(correlation) == null) {
+            // Create exchange for target
+            MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+            if (store.hasFeature(Store.CLUSTERED)) {
+                exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+                tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+            }
+            // Set correlations
+            tme.setProperty(correlation, exchange.getExchangeId());
+            exchange.setProperty(correlation, tme.getExchangeId());
+            // Put exchange to store
+            store.store(exchange.getExchangeId(), exchange);
+            // Now copy input to new exchange
+            // We need to read the message once for finding routing target
+            // so ensure we have a re-readable source
+            NormalizedMessage in = MessageUtil.copyIn(exchange);
+            MessageUtil.transferToIn(in, tme); 
+            // Retrieve target
+            ExchangeTarget target = getDestination(tme);
+            target.configureTarget(tme, getContext());
+            // Send in to target
+            send(tme);
+        // Mimic the exchange on the other side and send to needed listener
+        } else {
+            String id = (String) exchange.getProperty(correlation);
+            if (id == null) {
+                throw new IllegalStateException(correlation + " property not found");
+            }
+            MessageExchange org = (MessageExchange) store.load(id);
+            if (org == null) {
+                throw new IllegalStateException("Could not load original exchange with id " + id);
+            }
+            // Reproduce DONE status to the other side
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                done(org);
+            // Reproduce ERROR status to the other side
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                fail(org, exchange.getError());
+            // Reproduce faults to the other side and listeners
+            } else if (exchange.getFault() != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferTo(exchange, org, "fault"); 
+                send(org);
+            // Reproduce answers to the other side
+            } else if (exchange.getMessage("out") != null) {
                 store.store(exchange.getExchangeId(), exchange);
-                // Now copy input to new exchange
-                // We need to read the message once for finding routing target
-                // so ensure we have a re-readable source
-                NormalizedMessage in = MessageUtil.copyIn(exchange);
-                MessageUtil.transferToIn(in, tme); 
-                // Retrieve target
-                ExchangeTarget target = getDestination(tme);
-                target.configureTarget(tme, getContext());
-                // Send in to target
-                send(tme);
-            // Mimic the exchange on the other side and send to needed listener
+                MessageUtil.transferTo(exchange, org, "out"); 
+                send(org);
             } else {
-                String id = (String) exchange.getProperty(correlation);
-                if (id == null) {
-                    throw new IllegalStateException(correlation + " property not found");
-                }
-                MessageExchange org = (MessageExchange) store.load(id);
-                if (org == null) {
-                    throw new IllegalStateException("Could not load original exchange with id " + id);
-                }
-                // Reproduce DONE status to the other side
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    done(org);
-                // Reproduce ERROR status to the other side
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    fail(org, exchange.getError());
-                // Reproduce faults to the other side and listeners
-                } else if (exchange.getFault() != null) {
-                    store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferTo(exchange, org, "fault"); 
-                    send(org);
-                // Reproduce answers to the other side
-                } else if (exchange.getMessage("out") != null) {
-                    store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferTo(exchange, org, "out"); 
-                    send(org);
-                } else {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
-                }
-            }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java Mon Jun  5 03:57:19 2006
@@ -17,13 +17,12 @@
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.eip.support.MessageUtil;
@@ -42,8 +41,6 @@
  */
 public class MessageFilter extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(MessageFilter.class);
-    
     /**
      * The main target destination which will receive the exchange
      */
@@ -127,46 +124,67 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws Exception {
-        try {
-            // If we need to report errors, the behavior is really different,
-            // as we need to keep the incoming exchange in the store until
-            // all acks have been received
-            if (reportErrors) {
-                // TODO: implement this
-                throw new UnsupportedOperationException("Not implemented");
-            // We are in a simple fire-and-forget behaviour.
-            // This implementation is really efficient as we do not use
-            // the store at all.
-            } else {
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    return;
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    return;
-                } else if (exchange instanceof InOnly == false &&
-                           exchange instanceof RobustInOnly == false) {
-                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-                } else if (exchange.getFault() != null) {
-                    done(exchange);
-                } else {
-                    NormalizedMessage in = MessageUtil.copyIn(exchange);
-                    MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
-                    target.configureTarget(me, getContext());
-                    MessageUtil.transferToIn(in, me);
-                    if (filter.matches(me)) {
-                        send(me);
-                    }
+    protected void processSync(MessageExchange exchange) throws Exception {
+        if (exchange instanceof InOnly == false &&
+            exchange instanceof RobustInOnly == false) {
+            fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+        } else {
+            NormalizedMessage in = MessageUtil.copyIn(exchange);
+            MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+            target.configureTarget(me, getContext());
+            MessageUtil.transferToIn(in, me);
+            if (filter.matches(me)) {
+                sendSync(me);
+                if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
+                    fail(exchange, me.getError());
+                } else if (me.getStatus() == ExchangeStatus.DONE) {
                     done(exchange);
+                } else if (me.getFault() != null && reportErrors) {
+                    Fault fault = MessageUtil.copyFault(me);
+                    done(me);
+                    MessageUtil.transferToFault(fault, exchange);
+                    sendSync(exchange);
                 }
+            } else {
+                done(exchange);
             }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        // If we need to report errors, the behavior is really different,
+        // as we need to keep the incoming exchange in the store until
+        // all acks have been received
+        if (reportErrors) {
+            // TODO: implement this
+            throw new UnsupportedOperationException("Not implemented");
+        // We are in a simple fire-and-forget behaviour.
+        // This implementation is really efficient as we do not use
+        // the store at all.
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            } else if (exchange instanceof InOnly == false &&
+                       exchange instanceof RobustInOnly == false) {
+                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            } else if (exchange.getFault() != null) {
+                done(exchange);
+            } else {
+                NormalizedMessage in = MessageUtil.copyIn(exchange);
+                MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+                target.configureTarget(me, getContext());
+                MessageUtil.transferToIn(in, me);
+                if (filter.matches(me)) {
+                    send(me);
+                }
+                done(exchange);
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Mon Jun  5 03:57:19 2006
@@ -19,13 +19,12 @@
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.RobustInOnly;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.eip.support.MessageUtil;
@@ -48,8 +47,6 @@
  */
 public class Pipeline extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(Pipeline.class);
-
     private static final String TRANSFORMER = "Pipeline.Transformer";
     
     private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
@@ -127,152 +124,213 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            // The exchange comes from the consumer
-            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
-                // A DONE status from the consumer can only be received
-                // when a fault has been sent
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    String transformerId = (String) exchange.getProperty(correlationTransformer);
-                    String targetId = (String) exchange.getProperty(correlationTarget);
-                    if (transformerId == null && targetId == null) {
-                        throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
-                    }
-                    // Load the exchange
-                    MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
-                    done(me);
-                // Errors must be sent back to the target or transformer
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    String transformerId = (String) exchange.getProperty(correlationTransformer);
-                    String targetId = (String) exchange.getProperty(correlationTarget);
-                    if (transformerId == null && targetId == null) {
-                        throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
-                    }
-                    // Load the exchange
-                    MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
-                    fail(me, exchange.getError());
-                // This is a new exchange
-                } else if (exchange.getProperty(correlationTransformer) == null) {
-                    if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
-                        fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-                        return;
-                    }
-                    // Create exchange for target
-                    MessageExchange tme = exchangeFactory.createInOutExchange();
-                    transformer.configureTarget(tme, getContext());
-                    // Set correlations
-                    exchange.setProperty(correlationTransformer, tme.getExchangeId());
-                    tme.setProperty(correlationConsumer, exchange.getExchangeId());
-                    tme.setProperty(TRANSFORMER, Boolean.TRUE);
-                    tme.setProperty(CONSUMER_MEP, exchange.getPattern());
-                    // Put exchange to store
-                    store.store(exchange.getExchangeId(), exchange);
-                    // Send in to listener and target
-                    MessageUtil.transferInToIn(exchange, tme);
-                    send(tme);
-                } else {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
-                }
-            // If the exchange comes from the transformer
-            } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
-                // Retrieve the correlation id
-                String consumerId = (String) exchange.getProperty(correlationConsumer);
-                if (consumerId == null) {
-                    throw new IllegalStateException(correlationConsumer + " property not found");
-                }
-                // This should not happen beacause the MEP is an In-Out
-                // and the DONE status is always sent by the consumer (us)
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    throw new IllegalStateException("Received a DONE status from the transformer");
-                // Errors must be sent back to the consumer
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    MessageExchange me = (MessageExchange) store.load(consumerId);
-                    fail(me, exchange.getError());
-                // Faults must be sent back to the consumer
-                } else if (exchange.getFault() != null) {
-                    MessageExchange me = (MessageExchange) store.load(consumerId);
-                    if (me instanceof InOnly) {
-                        // Do not use the fault has it may contain streams
-                        // So just transform it to a string and send an error
-                        String fault = new SourceTransformer().contentToString(exchange.getFault());
-                        fail(me, new FaultException(fault, null, null));
-                        done(exchange);
-                    } else {
-                        store.store(exchange.getExchangeId(), exchange);
-                        MessageUtil.transferFaultToFault(exchange, me);
-                        send(me);
-                    }
-                // This is the answer from the transformer
-                } else if (exchange.getMessage("out") != null) {
-                    // Retrieve the consumer MEP
-                    URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
-                    if (mep == null) {
-                        throw new IllegalStateException("Exchange does not carry the consumer MEP");
-                    }
-                    MessageExchange me = exchangeFactory.createExchange(mep);
-                    target.configureTarget(me, getContext());
-                    me.setProperty(correlationConsumer, consumerId);
-                    me.setProperty(correlationTransformer, exchange.getExchangeId());
-                    store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferOutToIn(exchange, me);
-                    send(me);
-                // This should not happen
-                } else {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
-                }
-            // The exchange comes from the target
+    protected void processSync(MessageExchange exchange) throws Exception {
+        if (exchange instanceof InOnly == false &&
+            exchange instanceof RobustInOnly == false) {
+            fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            return;
+        }
+        // Create exchange for target
+        InOut tme = exchangeFactory.createInOutExchange();
+        transformer.configureTarget(tme, getContext());
+        // Send in to listener and target
+        MessageUtil.transferInToIn(exchange, tme);
+        sendSync(tme);
+        // Check result
+        if (tme.getStatus() == ExchangeStatus.DONE) {
+            throw new IllegalStateException("Received a DONE status from the transformer");
+        }
+        // Errors must be sent back to the consumer
+        else if (tme.getStatus() == ExchangeStatus.ERROR) {
+            fail(exchange, tme.getError());
+        }
+        // Faults must be sent back to the consumer
+        else if (tme.getFault() != null) {
+            if (exchange instanceof InOnly) {
+                // Do not use the fault has it may contain streams
+                // So just transform it to a string and send an error
+                String fault = new SourceTransformer().contentToString(tme.getFault());
+                done(tme);
+                fail(exchange, new FaultException(fault, null, null));
             } else {
-                // Retrieve the correlation id for the consumer
-                String consumerId = (String) exchange.getProperty(correlationConsumer);
-                if (consumerId == null) {
-                    throw new IllegalStateException(correlationConsumer + " property not found");
+                Fault fault = MessageUtil.copyFault(tme);
+                MessageUtil.transferToFault(fault, exchange);
+                done(tme);
+                sendSync(exchange);
+            }
+        }
+        // This should not happen
+        else if (tme.getOutMessage() == null) {
+            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+        }
+        // This is the answer from the transformer
+        MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+        target.configureTarget(me, getContext());
+        MessageUtil.transferOutToIn(tme, me);
+        sendSync(me);
+        done(tme);
+        if (me.getStatus() == ExchangeStatus.DONE) {
+            done(exchange);
+        } else if (me.getStatus() == ExchangeStatus.ERROR) {
+            fail(exchange, me.getError());
+        } else if (me.getFault() != null) {
+            if (exchange instanceof InOnly) {
+                // Do not use the fault has it may contain streams
+                // So just transform it to a string and send an error
+                String fault = new SourceTransformer().contentToString(me.getFault());
+                done(me);
+                fail(exchange, new FaultException(fault, null, null));
+            } else {
+                Fault fault = MessageUtil.copyFault(me);
+                MessageUtil.transferToFault(fault, exchange);
+                done(me);
+                sendSync(exchange);
+            }
+        } else {
+            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        // The exchange comes from the consumer
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            // A DONE status from the consumer can only be received
+            // when a fault has been sent
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                String transformerId = (String) exchange.getProperty(correlationTransformer);
+                String targetId = (String) exchange.getProperty(correlationTarget);
+                if (transformerId == null && targetId == null) {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
                 }
-                // Retrieve the correlation id for the transformer
+                // Load the exchange
+                MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+                done(me);
+            // Errors must be sent back to the target or transformer
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                 String transformerId = (String) exchange.getProperty(correlationTransformer);
-                if (transformerId == null) {
-                    throw new IllegalStateException(correlationTransformer + " property not found");
+                String targetId = (String) exchange.getProperty(correlationTarget);
+                if (transformerId == null && targetId == null) {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
                 }
-                // This should be the last message received
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    // Need to ack the transformer
-                    MessageExchange tme = (MessageExchange) store.load(transformerId);
-                    done(tme);
-                    // Need to ack the consumer
-                    MessageExchange cme = (MessageExchange) store.load(consumerId);
-                    done(cme);
-                // Errors should be sent back to the consumer
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    // Need to ack the transformer
-                    MessageExchange tme = (MessageExchange) store.load(transformerId);
-                    done(tme);
-                    // Send error to consumer
-                    MessageExchange cme = (MessageExchange) store.load(consumerId);
-                    fail(cme, exchange.getError());
-                // If we have a robust-in-only MEP, we can receive a fault
-                } else if (exchange.getFault() != null) {
-                    // Need to ack the transformer
-                    MessageExchange tme = (MessageExchange) store.load(transformerId);
-                    done(tme);
-                    // Send fault back to consumer
-                    store.store(exchange.getExchangeId(), exchange);
-                    MessageExchange cme = (MessageExchange) store.load(consumerId);
-                    cme.setProperty(correlationTarget, exchange.getExchangeId());
-                    MessageUtil.transferFaultToFault(exchange, cme);
-                    send(cme);
-                // This should not happen
+                // Load the exchange
+                MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+                fail(me, exchange.getError());
+            // This is a new exchange
+            } else if (exchange.getProperty(correlationTransformer) == null) {
+                if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
+                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+                    return;
+                }
+                // Create exchange for target
+                MessageExchange tme = exchangeFactory.createInOutExchange();
+                transformer.configureTarget(tme, getContext());
+                // Set correlations
+                exchange.setProperty(correlationTransformer, tme.getExchangeId());
+                tme.setProperty(correlationConsumer, exchange.getExchangeId());
+                tme.setProperty(TRANSFORMER, Boolean.TRUE);
+                tme.setProperty(CONSUMER_MEP, exchange.getPattern());
+                // Put exchange to store
+                store.store(exchange.getExchangeId(), exchange);
+                // Send in to listener and target
+                MessageUtil.transferInToIn(exchange, tme);
+                send(tme);
+            } else {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+            }
+        // If the exchange comes from the transformer
+        } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
+            // Retrieve the correlation id
+            String consumerId = (String) exchange.getProperty(correlationConsumer);
+            if (consumerId == null) {
+                throw new IllegalStateException(correlationConsumer + " property not found");
+            }
+            // This should not happen beacause the MEP is an In-Out
+            // and the DONE status is always sent by the consumer (us)
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                throw new IllegalStateException("Received a DONE status from the transformer");
+            // Errors must be sent back to the consumer
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                MessageExchange me = (MessageExchange) store.load(consumerId);
+                fail(me, exchange.getError());
+            // Faults must be sent back to the consumer
+            } else if (exchange.getFault() != null) {
+                MessageExchange me = (MessageExchange) store.load(consumerId);
+                if (me instanceof InOnly) {
+                    // Do not use the fault has it may contain streams
+                    // So just transform it to a string and send an error
+                    String fault = new SourceTransformer().contentToString(exchange.getFault());
+                    fail(me, new FaultException(fault, null, null));
+                    done(exchange);
                 } else {
-                    throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageUtil.transferFaultToFault(exchange, me);
+                    send(me);
+                }
+            // This is the answer from the transformer
+            } else if (exchange.getMessage("out") != null) {
+                // Retrieve the consumer MEP
+                URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+                if (mep == null) {
+                    throw new IllegalStateException("Exchange does not carry the consumer MEP");
                 }
+                MessageExchange me = exchangeFactory.createExchange(mep);
+                target.configureTarget(me, getContext());
+                me.setProperty(correlationConsumer, consumerId);
+                me.setProperty(correlationTransformer, exchange.getExchangeId());
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferOutToIn(exchange, me);
+                send(me);
+            // This should not happen
+            } else {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+            }
+        // The exchange comes from the target
+        } else {
+            // Retrieve the correlation id for the consumer
+            String consumerId = (String) exchange.getProperty(correlationConsumer);
+            if (consumerId == null) {
+                throw new IllegalStateException(correlationConsumer + " property not found");
             }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+            // Retrieve the correlation id for the transformer
+            String transformerId = (String) exchange.getProperty(correlationTransformer);
+            if (transformerId == null) {
+                throw new IllegalStateException(correlationTransformer + " property not found");
+            }
+            // This should be the last message received
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                // Need to ack the transformer
+                MessageExchange tme = (MessageExchange) store.load(transformerId);
+                done(tme);
+                // Need to ack the consumer
+                MessageExchange cme = (MessageExchange) store.load(consumerId);
+                done(cme);
+            // Errors should be sent back to the consumer
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                // Need to ack the transformer
+                MessageExchange tme = (MessageExchange) store.load(transformerId);
+                done(tme);
+                // Send error to consumer
+                MessageExchange cme = (MessageExchange) store.load(consumerId);
+                fail(cme, exchange.getError());
+            // If we have a robust-in-only MEP, we can receive a fault
+            } else if (exchange.getFault() != null) {
+                // Need to ack the transformer
+                MessageExchange tme = (MessageExchange) store.load(transformerId);
+                done(tme);
+                // Send fault back to consumer
+                store.store(exchange.getExchangeId(), exchange);
+                MessageExchange cme = (MessageExchange) store.load(consumerId);
+                cme.setProperty(correlationTarget, exchange.getExchangeId());
+                MessageUtil.transferFaultToFault(exchange, cme);
+                send(cme);
+            // This should not happen
+            } else {
+                throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Jun  5 03:57:19 2006
@@ -19,12 +19,9 @@
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.eip.support.MessageUtil;
@@ -43,8 +40,6 @@
  */
 public class StaticRecipientList extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(WireTap.class);
-
     /**
      * List of recipients
      */
@@ -106,46 +101,60 @@
     }
     
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            // If we need to report errors, the behavior is really different,
-            // as we need to keep the incoming exchange in the store until
-            // all acks have been received
-            if (reportErrors) {
-                // TODO: implement this
-                throw new UnsupportedOperationException("Not implemented");
-            // We are in a simple fire-and-forget behaviour.
-            // This implementation is really efficient as we do not use
-            // the store at all.
+    protected void processSync(MessageExchange exchange) throws Exception {
+        if (exchange instanceof InOnly == false &&
+            exchange instanceof RobustInOnly == false) {
+            fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            return;
+        }
+        NormalizedMessage in = MessageUtil.copyIn(exchange);
+        for (int i = 0; i < recipients.length; i++) {
+            MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+            recipients[i].configureTarget(me, getContext());
+            MessageUtil.transferToIn(in, me);
+            sendSync(me);
+            if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
+                fail(exchange, me.getError());
+                return;
+            }
+        }
+        done(exchange);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        // If we need to report errors, the behavior is really different,
+        // as we need to keep the incoming exchange in the store until
+        // all acks have been received
+        if (reportErrors) {
+            // TODO: implement this
+            throw new UnsupportedOperationException("Not implemented");
+        // We are in a simple fire-and-forget behaviour.
+        // This implementation is really efficient as we do not use
+        // the store at all.
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            } else if (exchange instanceof InOnly == false &&
+                       exchange instanceof RobustInOnly == false) {
+                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            } else if (exchange.getFault() != null) {
+                done(exchange);
             } else {
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    return;
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    return;
-                } else if (exchange instanceof InOnly == false &&
-                           exchange instanceof RobustInOnly == false) {
-                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-                } else if (exchange.getFault() != null) {
-                    done(exchange);
-                } else {
-                    NormalizedMessage in = MessageUtil.copyIn(exchange);
-                    for (int i = 0; i < recipients.length; i++) {
-                        MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
-                        recipients[i].configureTarget(me, getContext());
-                        MessageUtil.transferToIn(in, me);
-                        send(me);
-                    }
-                    done(exchange);
+                NormalizedMessage in = MessageUtil.copyIn(exchange);
+                for (int i = 0; i < recipients.length; i++) {
+                    MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+                    recipients[i].configureTarget(me, getContext());
+                    MessageUtil.transferToIn(in, me);
+                    send(me);
                 }
-            }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+                done(exchange);
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java Mon Jun  5 03:57:19 2006
@@ -17,12 +17,10 @@
 
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.eip.support.MessageUtil;
@@ -45,8 +43,6 @@
  */
 public class StaticRoutingSlip extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(Pipeline.class);
-
     /**
      * List of target components used in the RoutingSlip
      */
@@ -94,114 +90,146 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            // This exchange comes from the consumer
-            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    String correlationId = (String) exchange.getProperty(correlation);
-                    if (correlationId == null) {
-                        throw new IllegalStateException(correlation + " property not found");
-                    }
-                    // Ack last target hit
-                    MessageExchange me = (MessageExchange) store.load(correlationId);
-                    done(me);
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    String correlationId = (String) exchange.getProperty(correlation);
-                    if (correlationId == null) {
-                        throw new IllegalStateException(correlation + " property not found");
-                    }
-                    // Ack last target hit
-                    MessageExchange me = (MessageExchange) store.load(correlationId);
-                    done(me);
-                } else if (exchange instanceof InOut == false) {
-                    throw new IllegalStateException("Use an InOut MEP");
-                } else {
-                    MessageExchange me = exchangeFactory.createInOutExchange();
-                    me.setProperty(correlation, exchange.getExchangeId());
-                    me.setProperty(index, new Integer(0));
-                    targets[0].configureTarget(me, getContext());
-                    store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferInToIn(exchange, me);
-                    send(me);
-                }
-            // The exchange comes from a target
+    protected void processSync(MessageExchange exchange) throws Exception {
+        if (exchange instanceof InOut == false) {
+            throw new IllegalStateException("Use an InOut MEP");
+        }
+        MessageExchange current = exchange;
+        for (int i = 0; i < targets.length; i++) {
+            InOut me = exchangeFactory.createInOutExchange();
+            targets[i].configureTarget(me, getContext());
+            if (i == 0) {
+                MessageUtil.transferInToIn(current, me);
             } else {
+                MessageUtil.transferOutToIn(current, me);
+            }
+            sendSync(me);
+            if (i != 0) {
+                done(current);
+            }
+            if (me.getStatus() == ExchangeStatus.DONE) {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+            } else if (me.getStatus() == ExchangeStatus.ERROR) {
+                fail(exchange, me.getError());
+                return;
+            } else if (me.getFault() != null) {
+                Fault fault = MessageUtil.copyFault(me);
+                MessageUtil.transferToFault(fault, exchange);
+                done(me);
+                sendSync(exchange);
+                return;
+            } else if (me.getOutMessage() == null) {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+            }
+            current = me;
+        }
+        MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
+        done(current);
+        sendSync(exchange);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        // This exchange comes from the consumer
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
                 String correlationId = (String) exchange.getProperty(correlation);
-                String previousId = (String) exchange.getProperty(previous);
-                Integer prevIndex = (Integer) exchange.getProperty(index);
                 if (correlationId == null) {
                     throw new IllegalStateException(correlation + " property not found");
                 }
-                if (prevIndex == null) {
-                    throw new IllegalStateException(previous + " property not found");
+                // Ack last target hit
+                MessageExchange me = (MessageExchange) store.load(correlationId);
+                done(me);
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                String correlationId = (String) exchange.getProperty(correlation);
+                if (correlationId == null) {
+                    throw new IllegalStateException(correlation + " property not found");
+                }
+                // Ack last target hit
+                MessageExchange me = (MessageExchange) store.load(correlationId);
+                done(me);
+            } else if (exchange instanceof InOut == false) {
+                throw new IllegalStateException("Use an InOut MEP");
+            } else {
+                MessageExchange me = exchangeFactory.createInOutExchange();
+                me.setProperty(correlation, exchange.getExchangeId());
+                me.setProperty(index, new Integer(0));
+                targets[0].configureTarget(me, getContext());
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferInToIn(exchange, me);
+                send(me);
+            }
+        // The exchange comes from a target
+        } else {
+            String correlationId = (String) exchange.getProperty(correlation);
+            String previousId = (String) exchange.getProperty(previous);
+            Integer prevIndex = (Integer) exchange.getProperty(index);
+            if (correlationId == null) {
+                throw new IllegalStateException(correlation + " property not found");
+            }
+            if (prevIndex == null) {
+                throw new IllegalStateException(previous + " property not found");
+            }
+            // This should never happen, as we can only send DONE
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+            // ERROR are sent back to the consumer
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                MessageExchange me = (MessageExchange) store.load(correlationId);
+                fail(me, exchange.getError());
+                // Ack the previous target
+                if (previousId != null) {
+                    me = (MessageExchange) store.load(previousId);
+                    done(me);
+                }
+            // Faults are sent back to the consumer
+            } else if (exchange.getFault() != null) {
+                MessageExchange me = (MessageExchange) store.load(correlationId);
+                me.setProperty(correlation, exchange.getExchangeId());
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferFaultToFault(exchange, me);
+                send(me);
+                // Ack the previous target
+                if (previousId != null) {
+                    me = (MessageExchange) store.load(previousId);
+                    done(me);
                 }
-                // This should never happen, as we can only send DONE
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
-                // ERROR are sent back to the consumer
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            // Out message, give it to next target or back to consumer
+            } else if (exchange.getMessage("out") != null) {
+                // This is the answer from the last target
+                if (prevIndex.intValue() == targets.length - 1) {
                     MessageExchange me = (MessageExchange) store.load(correlationId);
-                    fail(me, exchange.getError());
-                    // Ack the previous target
+                    me.setProperty(correlation, exchange.getExchangeId());
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageUtil.transferOutToOut(exchange, me);
+                    send(me);
                     if (previousId != null) {
                         me = (MessageExchange) store.load(previousId);
                         done(me);
                     }
-                // Faults are sent back to the consumer
-                } else if (exchange.getFault() != null) {
-                    MessageExchange me = (MessageExchange) store.load(correlationId);
-                    me.setProperty(correlation, exchange.getExchangeId());
+                // We still have a target to hit
+                } else {
+                    MessageExchange me = exchangeFactory.createInOutExchange();
+                    Integer curIndex = new Integer(prevIndex.intValue() + 1);
+                    me.setProperty(correlation, correlationId);
+                    me.setProperty(index, curIndex);
+                    me.setProperty(previous, exchange.getExchangeId());
+                    targets[curIndex.intValue()].configureTarget(me, getContext());
                     store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferFaultToFault(exchange, me);
+                    MessageUtil.transferOutToIn(exchange, me);
                     send(me);
-                    // Ack the previous target
                     if (previousId != null) {
                         me = (MessageExchange) store.load(previousId);
                         done(me);
                     }
-                // Out message, give it to next target or back to consumer
-                } else if (exchange.getMessage("out") != null) {
-                    // This is the answer from the last target
-                    if (prevIndex.intValue() == targets.length - 1) {
-                        MessageExchange me = (MessageExchange) store.load(correlationId);
-                        me.setProperty(correlation, exchange.getExchangeId());
-                        store.store(exchange.getExchangeId(), exchange);
-                        MessageUtil.transferOutToOut(exchange, me);
-                        send(me);
-                        if (previousId != null) {
-                            me = (MessageExchange) store.load(previousId);
-                            done(me);
-                        }
-                    // We still have a target to hit
-                    } else {
-                        MessageExchange me = exchangeFactory.createInOutExchange();
-                        Integer curIndex = new Integer(prevIndex.intValue() + 1);
-                        me.setProperty(correlation, correlationId);
-                        me.setProperty(index, curIndex);
-                        me.setProperty(previous, exchange.getExchangeId());
-                        targets[curIndex.intValue()].configureTarget(me, getContext());
-                        store.store(exchange.getExchangeId(), exchange);
-                        MessageUtil.transferOutToIn(exchange, me);
-                        send(me);
-                        if (previousId != null) {
-                            me = (MessageExchange) store.load(previousId);
-                            done(me);
-                        }
-                    }
-                // This should not happen
-                } else {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
                 }
-            }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+            // This should not happen
+            } else {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java Mon Jun  5 03:57:19 2006
@@ -19,11 +19,8 @@
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.eip.support.ExchangeTarget;
@@ -47,8 +44,6 @@
  */
 public class WireTap extends EIPEndpoint {
 
-    private static final Log log = LogFactory.getLog(WireTap.class);
-    
     /**
      * The main target destination which will receive the exchange
      */
@@ -140,70 +135,85 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processSync(MessageExchange exchange) throws Exception {
+        // Create exchange for target
+        MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+        target.configureTarget(tme, getContext());
+        sendSyncToListenerAndTarget(exchange, tme, inListener, "in");
+        if (tme.getStatus() == ExchangeStatus.DONE) {
+            done(exchange);
+        } else if (tme.getStatus() == ExchangeStatus.ERROR) {
+            fail(exchange, tme.getError());
+        } else if (tme.getFault() != null) {
+            sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault");
+            done(tme);
+        } else if (tme.getMessage("out") != null) {
+            sendSyncToListenerAndTarget(tme, exchange, outListener, "out");
+            done(tme);
+        } else {
+            done(tme);
+            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
-                exchange.getProperty(correlation) == null) {
-                // Create exchange for target
-                MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
-                if (store.hasFeature(Store.CLUSTERED)) {
-                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
-                    tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+            exchange.getProperty(correlation) == null) {
+            // Create exchange for target
+            MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+            if (store.hasFeature(Store.CLUSTERED)) {
+                exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+                tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+            }
+            target.configureTarget(tme, getContext());
+            // Set correlations
+            exchange.setProperty(correlation, tme.getExchangeId());
+            tme.setProperty(correlation, exchange.getExchangeId());
+            // Put exchange to store
+            store.store(exchange.getExchangeId(), exchange);
+            // Send in to listener and target
+            sendToListenerAndTarget(exchange, tme, inListener, "in");
+        // Mimic the exchange on the other side and send to needed listener
+        } else {
+            String id = (String) exchange.getProperty(correlation);
+            if (id == null) {
+                if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
+                    exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                    // This must be a listener status, so ignore
+                    return;
                 }
-                target.configureTarget(tme, getContext());
-                // Set correlations
-                exchange.setProperty(correlation, tme.getExchangeId());
-                tme.setProperty(correlation, exchange.getExchangeId());
-                // Put exchange to store
+                throw new IllegalStateException(correlation + " property not found");
+            }
+            MessageExchange org = (MessageExchange) store.load(id);
+            if (org == null) {
+                throw new IllegalStateException("Could not load original exchange with id " + id);
+            }
+            // Reproduce DONE status to the other side
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                done(org);
+            // Reproduce ERROR status to the other side
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                fail(org, exchange.getError());
+            // Reproduce faults to the other side and listeners
+            } else if (exchange.getFault() != null) {
                 store.store(exchange.getExchangeId(), exchange);
-                // Send in to listener and target
-                sendToTargetAndListener(exchange, tme, inListener, "in");
-            // Mimic the exchange on the other side and send to needed listener
+                sendToListenerAndTarget(exchange, org, faultListener, "fault");
+            // Reproduce answers to the other side
+            } else if (exchange.getMessage("out") != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                sendToListenerAndTarget(exchange, org, outListener, "out");
             } else {
-                String id = (String) exchange.getProperty(correlation);
-                if (id == null) {
-                    if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
-                        exchange.getStatus() != ExchangeStatus.ACTIVE) {
-                        // This must be a listener status, so ignore
-                        return;
-                    }
-                    throw new IllegalStateException(correlation + " property not found");
-                }
-                MessageExchange org = (MessageExchange) store.load(id);
-                if (org == null) {
-                    throw new IllegalStateException("Could not load original exchange with id " + id);
-                }
-                // Reproduce DONE status to the other side
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    done(org);
-                // Reproduce ERROR status to the other side
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    fail(org, exchange.getError());
-                // Reproduce faults to the other side and listeners
-                } else if (exchange.getFault() != null) {
-                    store.store(exchange.getExchangeId(), exchange);
-                    sendToTargetAndListener(exchange, org, faultListener, "fault");
-                // Reproduce answers to the other side
-                } else if (exchange.getMessage("out") != null) {
-                    store.store(exchange.getExchangeId(), exchange);
-                    sendToTargetAndListener(exchange, org, outListener, "out");
-                } else {
-                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
-                }
-            }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
             }
         }
     }
     
-    private void sendToTargetAndListener(MessageExchange source, 
+    private void sendToListenerAndTarget(MessageExchange source, 
                                          MessageExchange dest, 
                                          ExchangeTarget listener,
                                          String message) throws Exception {
@@ -219,8 +229,29 @@
             MessageUtil.transferTo(msg, dest, message);
             send(dest);
         } else {
-           MessageUtil.transferTo(source, dest, message); 
-           send(dest);
+            MessageUtil.transferTo(source, dest, message);
+            send(dest);
+        }
+    }
+
+    private void sendSyncToListenerAndTarget(MessageExchange source, 
+                                             MessageExchange dest, 
+                                             ExchangeTarget listener,
+                                             String message) throws Exception {
+        if (listener != null) {
+            NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
+            InOnly lme = exchangeFactory.createInOnlyExchange();
+            if (store.hasFeature(Store.CLUSTERED)) {
+                lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+            }
+            listener.configureTarget(lme, getContext());
+            MessageUtil.transferToIn(msg, lme);
+            sendSync(lme);
+            MessageUtil.transferTo(msg, dest, message);
+            sendSync(dest);
+        } else {
+            MessageUtil.transferTo(source, dest, message);
+            sendSync(dest);
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Jun  5 03:57:19 2006
@@ -53,10 +53,26 @@
     private ExchangeTarget target;
     
     private boolean rescheduleTimeouts;
+    
+    private boolean synchronous;
 
     private ConcurrentMap closedAggregates = new ConcurrentHashMap();
     
     /**
+     * @return the synchronous
+     */
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * @param synchronous the synchronous to set
+     */
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
+    /**
      * @return the rescheduleTimeouts
      */
     public boolean isRescheduleTimeouts() {
@@ -84,80 +100,84 @@
         this.target = target;
     }
     
-    /*(non-Javadoc)
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processSync(MessageExchange exchange) throws Exception {
+        throw new IllegalStateException();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        throw new IllegalStateException();
+    }
+    
+    /* (non-Javadoc)
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
      */
     public void process(MessageExchange exchange) throws Exception {
-        try {
-            // Skip DONE
-            if (exchange.getStatus() == ExchangeStatus.DONE) {
-                return;
-            // Skip ERROR
-            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                return;
-            // Handle an ACTIVE exchange as a PROVIDER
-            } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
-                if (exchange instanceof InOnly == false &&
-                    exchange instanceof RobustInOnly == false) {
-                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-                } else {
-                    NormalizedMessage in = MessageUtil.copyIn(exchange);
-                    final String correlationId = getCorrelationID(exchange, in);
-                    if (correlationId == null || correlationId.length() == 0) {
-                        throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
-                    }
-                    // Load existing aggregation
-                    Lock lock = getLockManager().getLock(correlationId);
-                    lock.lock();
-                    try {
-                        Object aggregation = store.load(correlationId);
-                        Date timeout = null;
-                        // Create a new aggregate
-                        if (aggregation == null) {
-                            if (isAggregationClosed(correlationId)) {
-                                // TODO: should we return an error here ?
-                            } else {
-                                aggregation = createAggregation(correlationId);
-                                timeout = getTimeout(aggregation);
-                            }
-                        } else if (isRescheduleTimeouts()) {
+        // Skip DONE
+        if (exchange.getStatus() == ExchangeStatus.DONE) {
+            return;
+        // Skip ERROR
+        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            return;
+        // Handle an ACTIVE exchange as a PROVIDER
+        } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            if (exchange instanceof InOnly == false &&
+                exchange instanceof RobustInOnly == false) {
+                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            } else {
+                NormalizedMessage in = MessageUtil.copyIn(exchange);
+                final String correlationId = getCorrelationID(exchange, in);
+                if (correlationId == null || correlationId.length() == 0) {
+                    throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
+                }
+                // Load existing aggregation
+                Lock lock = getLockManager().getLock(correlationId);
+                lock.lock();
+                try {
+                    Object aggregation = store.load(correlationId);
+                    Date timeout = null;
+                    // Create a new aggregate
+                    if (aggregation == null) {
+                        if (isAggregationClosed(correlationId)) {
+                            // TODO: should we return an error here ?
+                        } else {
+                            aggregation = createAggregation(correlationId);
                             timeout = getTimeout(aggregation);
-                            
                         }
-                        // If the aggregation is not closed
-                        if (aggregation != null) {
-                            if (addMessage(aggregation, in, exchange)) {
-                                sendAggregate(correlationId, aggregation, false);
-                            } else {
-                                store.store(correlationId, aggregation);
-                                if (timeout != null) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
-                                    }
-                                    getTimerManager().schedule(new TimerListener() {
-                                        public void timerExpired(Timer timer) {
-                                            AbstractAggregator.this.onTimeout(correlationId);
-                                        }
-                                    }, timeout);
+                    } else if (isRescheduleTimeouts()) {
+                        timeout = getTimeout(aggregation);
+                    }
+                    // If the aggregation is not closed
+                    if (aggregation != null) {
+                        if (addMessage(aggregation, in, exchange)) {
+                            sendAggregate(correlationId, aggregation, false);
+                        } else {
+                            store.store(correlationId, aggregation);
+                            if (timeout != null) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
                                 }
+                                getTimerManager().schedule(new TimerListener() {
+                                    public void timerExpired(Timer timer) {
+                                        AbstractAggregator.this.onTimeout(correlationId);
+                                    }
+                                }, timeout);
                             }
                         }
-                        done(exchange);
-                    } finally {
-                        lock.unlock();
                     }
+                    done(exchange);
+                } finally {
+                    lock.unlock();
                 }
-            // Handle an ACTIVE exchange as a CONSUMER
-            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                done(exchange);
-            }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
             }
+        // Handle an ACTIVE exchange as a CONSUMER
+        } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            done(exchange);
         }
     }
     
@@ -170,7 +190,11 @@
         me.setInMessage(nm);
         buildAggregate(aggregation, nm, me, timeout);
         closeAggregation(correlationId);
-        send(me);
+        if (isSynchronous()) {
+            sendSync(me);
+        } else {
+            send(me);
+        }
     }
 
     protected void onTimeout(String correlationId) {