You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/04/15 01:38:46 UTC

svn commit: r394227 [2/3] - in /incubator/servicemix/trunk: ./ servicemix-assembly/ servicemix-common/src/main/java/org/apache/servicemix/common/ servicemix-core/src/main/java/org/apache/servicemix/ servicemix-core/src/main/java/org/apache/servicemix/e...

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.net.URI;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+import org.apache.servicemix.jbi.FaultException;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+
+/**
+ * The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and
+ * an In-Out MEP.
+ * When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP 
+ * to the tranformer destination and forward the response in an In-Only MEP to the target 
+ * destination.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows. 
+ *  
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="pipeline"
+ *                  description="A Pipeline"
+ */
+public class Pipeline extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(Pipeline.class);
+
+    private static final String TRANSFORMER = "Pipeline.Transformer";
+    
+    private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
+
+    /**
+     * The adress of the in-out endpoint acting as a transformer
+     */
+    private ExchangeTarget transformer;
+
+    /**
+     * The address of the target endpoint
+     */
+    private ExchangeTarget target;
+
+    /**
+     * The correlation property used by this component
+     */
+    private String correlationConsumer;
+
+    /**
+     * The correlation property used by this component
+     */
+    private String correlationTransformer;
+
+    /**
+     * The correlation property used by this component
+     */
+    private String correlationTarget;
+
+    /**
+     * @return Returns the target.
+     */
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    /**
+     * @param target The target to set.
+     */
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+
+    /**
+     * @return Returns the transformer.
+     */
+    public ExchangeTarget getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * @param transformer The transformer to set.
+     */
+    public void setTransformer(ExchangeTarget transformer) {
+        this.transformer = transformer;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check target
+        if (target == null) {
+            throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+        }
+        // Check transformer
+        if (transformer == null) {
+            throw new IllegalArgumentException("transformer should be set to a valid ExchangeTarget");
+        }
+        // Create correlation properties
+        correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint();
+        correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint();
+        correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws MessagingException {
+        try {
+            // The exchange comes from the consumer
+            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+                // A DONE status from the consumer can only be received
+                // when a fault has been sent
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    String transformerId = (String) exchange.getProperty(correlationTransformer);
+                    String targetId = (String) exchange.getProperty(correlationTarget);
+                    if (transformerId == null && targetId == null) {
+                        throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
+                    }
+                    // Load the exchange
+                    MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+                    done(me);
+                // Errors must be sent back to the target or transformer
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    String transformerId = (String) exchange.getProperty(correlationTransformer);
+                    String targetId = (String) exchange.getProperty(correlationTarget);
+                    if (transformerId == null && targetId == null) {
+                        throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
+                    }
+                    // Load the exchange
+                    MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+                    fail(me, exchange.getError());
+                // This is a new exchange
+                } else if (exchange.getProperty(correlationTransformer) == null) {
+                    if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
+                        fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+                        return;
+                    }
+                    // Create exchange for target
+                    MessageExchange tme = exchangeFactory.createInOutExchange();
+                    transformer.configureTarget(tme, getContext());
+                    // Set correlations
+                    exchange.setProperty(correlationTransformer, tme.getExchangeId());
+                    tme.setProperty(correlationConsumer, exchange.getExchangeId());
+                    tme.setProperty(TRANSFORMER, Boolean.TRUE);
+                    tme.setProperty(CONSUMER_MEP, exchange.getPattern());
+                    // Put exchange to store
+                    store.store(exchange.getExchangeId(), exchange);
+                    // Send in to listener and target
+                    MessageUtil.transferInToIn(exchange, tme);
+                    send(tme);
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+                }
+            // If the exchange comes from the transformer
+            } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
+                // Retrieve the correlation id
+                String consumerId = (String) exchange.getProperty(correlationConsumer);
+                if (consumerId == null) {
+                    throw new IllegalStateException(correlationConsumer + " property not found");
+                }
+                // This should not happen beacause the MEP is an In-Out
+                // and the DONE status is always sent by the consumer (us)
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    throw new IllegalStateException("Received a DONE status from the transformer");
+                // Errors must be sent back to the consumer
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    MessageExchange me = (MessageExchange) store.load(consumerId);
+                    fail(me, exchange.getError());
+                // Faults must be sent back to the consumer
+                } else if (exchange.getFault() != null) {
+                    MessageExchange me = (MessageExchange) store.load(consumerId);
+                    if (me instanceof InOnly) {
+                        // Do not use the fault has it may contain streams
+                        // So just transform it to a string and send an error
+                        String fault = new SourceTransformer().contentToString(exchange.getFault());
+                        fail(me, new FaultException(fault, null, null));
+                        done(exchange);
+                    } else {
+                        store.store(exchange.getExchangeId(), exchange);
+                        MessageUtil.transferFaultToFault(exchange, me);
+                        send(me);
+                    }
+                // This is the answer from the transformer
+                } else if (exchange.getMessage("out") != null) {
+                    // Retrieve the consumer MEP
+                    URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+                    if (mep == null) {
+                        throw new IllegalStateException("Exchange does not carry the consumer MEP");
+                    }
+                    MessageExchange me = exchangeFactory.createExchange(mep);
+                    target.configureTarget(me, getContext());
+                    me.setProperty(correlationConsumer, consumerId);
+                    me.setProperty(correlationTransformer, exchange.getExchangeId());
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageUtil.transferOutToIn(exchange, me);
+                    send(me);
+                // This should not happen
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+                }
+            // The exchange comes from the target
+            } else {
+                // Retrieve the correlation id for the consumer
+                String consumerId = (String) exchange.getProperty(correlationConsumer);
+                if (consumerId == null) {
+                    throw new IllegalStateException(correlationConsumer + " property not found");
+                }
+                // Retrieve the correlation id for the transformer
+                String transformerId = (String) exchange.getProperty(correlationTransformer);
+                if (transformerId == null) {
+                    throw new IllegalStateException(correlationTransformer + " property not found");
+                }
+                // This should be the last message received
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    // Need to ack the transformer
+                    MessageExchange tme = (MessageExchange) store.load(transformerId);
+                    done(tme);
+                    // Need to ack the consumer
+                    MessageExchange cme = (MessageExchange) store.load(consumerId);
+                    done(cme);
+                // Errors should be sent back to the consumer
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    // Need to ack the transformer
+                    MessageExchange tme = (MessageExchange) store.load(transformerId);
+                    done(tme);
+                    // Send error to consumer
+                    MessageExchange cme = (MessageExchange) store.load(consumerId);
+                    fail(cme, exchange.getError());
+                // If we have a robust-in-only MEP, we can receive a fault
+                } else if (exchange.getFault() != null) {
+                    // Need to ack the transformer
+                    MessageExchange tme = (MessageExchange) store.load(transformerId);
+                    done(tme);
+                    // Send fault back to consumer
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageExchange cme = (MessageExchange) store.load(consumerId);
+                    cme.setProperty(correlationTarget, exchange.getExchangeId());
+                    MessageUtil.transferFaultToFault(exchange, cme);
+                    send(cme);
+                // This should not happen
+                } else {
+                    throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
+                }
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+
+/**
+ * The StaticRecipientList component will forward an input In-Only or Robust-In-Only
+ * exchange to a list of known recipients.
+ * This component implements the  
+ * <a href="http://www.enterpriseintegrationpatterns.com/RecipientList.html">Recipient List</a> 
+ * pattern, with the limitation that the recipient list is static.
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="static-recipient-list"
+ *                  description="A static Recipient List"
+ */
+public class StaticRecipientList extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(WireTap.class);
+
+    /**
+     * List of recipients
+     */
+    private ExchangeTarget[] recipients;
+    /**
+     * Indicates if faults and errors from recipients should be sent
+     * back to the consumer.  In such a case, only the first fault or
+     * error received will be reported.
+     * Note that if the consumer is synchronous, it will be blocked
+     * until all recipients successfully acked the exchange, or
+     * a fault or error is reported, and the exchange will be kept in the
+     * store for recovery. 
+     */
+    private boolean reportErrors;
+    /**
+     * The correlation property used by this component
+     */
+    private String correlation;
+
+    /**
+     * @return Returns the recipients.
+     */
+    public ExchangeTarget[] getRecipients() {
+        return recipients;
+    }
+
+    /**
+     * @param recipients The recipients to set.
+     */
+    public void setRecipients(ExchangeTarget[] recipients) {
+        this.recipients = recipients;
+    }
+
+    /**
+     * @return Returns the reportErrors.
+     */
+    public boolean isReportErrors() {
+        return reportErrors;
+    }
+
+    /**
+     * @param reportErrors The reportErrors to set.
+     */
+    public void setReportErrors(boolean reportErrors) {
+        this.reportErrors = reportErrors;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check recipients
+        if (recipients == null || recipients.length == 0) {
+            throw new IllegalArgumentException("recipients should contain at least one ExchangeTarget");
+        }
+        // Create correlation property
+        correlation = "StaticRecipientList.Correlation." + getService() + "." + getEndpoint();
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws MessagingException {
+        try {
+            // If we need to report errors, the behavior is really different,
+            // as we need to keep the incoming exchange in the store until
+            // all acks have been received
+            if (reportErrors) {
+                // TODO: implement this
+                throw new UnsupportedOperationException("Not implemented");
+            // We are in a simple fire-and-forget behaviour.
+            // This implementation is really efficient as we do not use
+            // the store at all.
+            } else {
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    return;
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    return;
+                } else if (exchange instanceof InOnly == false &&
+                           exchange instanceof RobustInOnly == false) {
+                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+                } else if (exchange.getFault() != null) {
+                    done(exchange);
+                } else {
+                    NormalizedMessage in = MessageUtil.copyIn(exchange);
+                    for (int i = 0; i < recipients.length; i++) {
+                        MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+                        recipients[i].configureTarget(me, getContext());
+                        MessageUtil.transferToIn(in, me);
+                        send(me);
+                    }
+                    done(exchange);
+                }
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+
+/**
+ * A RoutingSlip component can be used to route an incoming In-Out exchange
+ * through a series of target services.
+ * This component implements the 
+ * <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a> 
+ * pattern, with the limitation that the routing table is static.
+ * This component only uses In-Out MEPs and errors or faults sent by targets are reported
+ * back to the consumer, thus interrupting the routing process.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows. 
+ *  
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="static-routing-slip"
+ *                  description="A static Routing Slip"
+ */
+public class StaticRoutingSlip extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(Pipeline.class);
+
+    /**
+     * List of target components used in the RoutingSlip
+     */
+    private ExchangeTarget[] targets;
+    /**
+     * The correlation property used by this component
+     */
+    private String correlation;
+    /**
+     * The current index of the target 
+     */
+    private String index;
+    /**
+     * The id of the previous target exchange 
+     */
+    private String previous;
+    
+    /**
+     * @return Returns the targets.
+     */
+    public ExchangeTarget[] getTargets() {
+        return targets;
+    }
+
+    /**
+     * @param targets The targets to set.
+     */
+    public void setTargets(ExchangeTarget[] targets) {
+        this.targets = targets;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check target
+        if (targets == null || targets.length == 0) {
+            throw new IllegalArgumentException("targets should contain at least one ExchangeTarget");
+        }
+        // Create correlation properties
+        correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
+        index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
+        previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws MessagingException {
+        try {
+            // This exchange comes from the consumer
+            if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    String correlationId = (String) exchange.getProperty(correlation);
+                    if (correlationId == null) {
+                        throw new IllegalStateException(correlation + " property not found");
+                    }
+                    // Ack last target hit
+                    MessageExchange me = (MessageExchange) store.load(correlationId);
+                    done(me);
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    String correlationId = (String) exchange.getProperty(correlation);
+                    if (correlationId == null) {
+                        throw new IllegalStateException(correlation + " property not found");
+                    }
+                    // Ack last target hit
+                    MessageExchange me = (MessageExchange) store.load(correlationId);
+                    done(me);
+                } else if (exchange instanceof InOut == false) {
+                    throw new IllegalStateException("Use an InOut MEP");
+                } else {
+                    MessageExchange me = exchangeFactory.createInOutExchange();
+                    me.setProperty(correlation, exchange.getExchangeId());
+                    me.setProperty(index, new Integer(0));
+                    targets[0].configureTarget(me, getContext());
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageUtil.transferInToIn(exchange, me);
+                    send(me);
+                }
+            // The exchange comes from a target
+            } else {
+                String correlationId = (String) exchange.getProperty(correlation);
+                String previousId = (String) exchange.getProperty(previous);
+                Integer prevIndex = (Integer) exchange.getProperty(index);
+                if (correlationId == null) {
+                    throw new IllegalStateException(correlation + " property not found");
+                }
+                if (prevIndex == null) {
+                    throw new IllegalStateException(previous + " property not found");
+                }
+                // This should never happen, as we can only send DONE
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+                // ERROR are sent back to the consumer
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    MessageExchange me = (MessageExchange) store.load(correlationId);
+                    fail(me, exchange.getError());
+                    // Ack the previous target
+                    if (previousId != null) {
+                        me = (MessageExchange) store.load(previousId);
+                        done(me);
+                    }
+                // Faults are sent back to the consumer
+                } else if (exchange.getFault() != null) {
+                    MessageExchange me = (MessageExchange) store.load(correlationId);
+                    me.setProperty(correlation, exchange.getExchangeId());
+                    store.store(exchange.getExchangeId(), exchange);
+                    MessageUtil.transferFaultToFault(exchange, me);
+                    send(me);
+                    // Ack the previous target
+                    if (previousId != null) {
+                        me = (MessageExchange) store.load(previousId);
+                        done(me);
+                    }
+                // Out message, give it to next target or back to consumer
+                } else if (exchange.getMessage("out") != null) {
+                    // This is the answer from the last target
+                    if (prevIndex.intValue() == targets.length - 1) {
+                        MessageExchange me = (MessageExchange) store.load(correlationId);
+                        me.setProperty(correlation, exchange.getExchangeId());
+                        store.store(exchange.getExchangeId(), exchange);
+                        MessageUtil.transferOutToOut(exchange, me);
+                        send(me);
+                        if (previousId != null) {
+                            me = (MessageExchange) store.load(previousId);
+                            done(me);
+                        }
+                    // We still have a target to hit
+                    } else {
+                        MessageExchange me = exchangeFactory.createInOutExchange();
+                        Integer curIndex = new Integer(prevIndex.intValue() + 1);
+                        me.setProperty(correlation, correlationId);
+                        me.setProperty(index, curIndex);
+                        me.setProperty(previous, exchange.getExchangeId());
+                        targets[curIndex.intValue()].configureTarget(me, getContext());
+                        store.store(exchange.getExchangeId(), exchange);
+                        MessageUtil.transferOutToIn(exchange, me);
+                        send(me);
+                        if (previousId != null) {
+                            me = (MessageExchange) store.load(previousId);
+                            done(me);
+                        }
+                    }
+                // This should not happen
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+                }
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+import org.apache.servicemix.store.Store;
+
+/**
+ *
+ * A WireTap component can be used to forward a copy of the input message to a listener.
+ * This component implements the 
+ * <a href="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a> 
+ * pattern.
+ * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows. 
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="wire-tap"
+ *                  description="A WireTap"
+ */
+public class WireTap extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(WireTap.class);
+    
+    /**
+     * The main target destination which will receive the exchange
+     */
+    private ExchangeTarget target;
+    /**
+     * The listener destination for in messages
+     */
+    private ExchangeTarget inListener;
+    /**
+     * The listener destination for out messages
+     */
+    private ExchangeTarget outListener;
+    /**
+     * The listener destination for fault messages
+     */
+    private ExchangeTarget faultListener;
+    /**
+     * The correlation property used by this component
+     */
+    private String correlation;
+    
+    /**
+     * @return Returns the target.
+     */
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    /**
+     * @param target The target to set.
+     */
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+
+    /**
+     * @return Returns the faultListener.
+     */
+    public ExchangeTarget getFaultListener() {
+        return faultListener;
+    }
+
+    /**
+     * @param faultListener The faultListener to set.
+     */
+    public void setFaultListener(ExchangeTarget faultListener) {
+        this.faultListener = faultListener;
+    }
+
+    /**
+     * @return Returns the inListener.
+     */
+    public ExchangeTarget getInListener() {
+        return inListener;
+    }
+
+    /**
+     * @param inListener The inListener to set.
+     */
+    public void setInListener(ExchangeTarget inListener) {
+        this.inListener = inListener;
+    }
+
+    /**
+     * @return Returns the outListener.
+     */
+    public ExchangeTarget getOutListener() {
+        return outListener;
+    }
+
+    /**
+     * @param outListener The outListener to set.
+     */
+    public void setOutListener(ExchangeTarget outListener) {
+        this.outListener = outListener;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check target
+        if (target == null) {
+            throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+        }
+        // Create correlation property
+        correlation = "WireTap.Correlation." + getService() + "." + getEndpoint();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws MessagingException {
+        try {
+            if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+                exchange.getProperty(correlation) == null) {
+                // Create exchange for target
+                MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+                if (store.hasFeature(Store.CLUSTERED)) {
+                    exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+                    tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+                }
+                target.configureTarget(tme, getContext());
+                // Set correlations
+                exchange.setProperty(correlation, tme.getExchangeId());
+                tme.setProperty(correlation, exchange.getExchangeId());
+                // Put exchange to store
+                store.store(exchange.getExchangeId(), exchange);
+                // Send in to listener and target
+                sendToTargetAndListener(exchange, tme, inListener, "in");
+            // Mimic the exchange on the other side and send to needed listener
+            } else {
+                String id = (String) exchange.getProperty(correlation);
+                if (id == null) {
+                    if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
+                        exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                        // This must be a listener status, so ignore
+                        return;
+                    }
+                    throw new IllegalStateException(correlation + " property not found");
+                }
+                MessageExchange org = (MessageExchange) store.load(id);
+                if (org == null) {
+                    throw new IllegalStateException("Could not load original exchange with id " + id);
+                }
+                // Reproduce DONE status to the other side
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    done(org);
+                // Reproduce ERROR status to the other side
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    fail(org, exchange.getError());
+                // Reproduce faults to the other side and listeners
+                } else if (exchange.getFault() != null) {
+                    store.store(exchange.getExchangeId(), exchange);
+                    sendToTargetAndListener(exchange, org, faultListener, "fault");
+                // Reproduce answers to the other side
+                } else if (exchange.getMessage("out") != null) {
+                    store.store(exchange.getExchangeId(), exchange);
+                    sendToTargetAndListener(exchange, org, outListener, "out");
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+                }
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+    
+    private void sendToTargetAndListener(MessageExchange source, 
+                                         MessageExchange dest, 
+                                         ExchangeTarget listener,
+                                         String message) throws Exception {
+        if (listener != null) {
+            NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
+            InOnly lme = exchangeFactory.createInOnlyExchange();
+            if (store.hasFeature(Store.CLUSTERED)) {
+                lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+            }
+            listener.configureTarget(lme, getContext());
+            MessageUtil.transferToIn(msg, lme);
+            send(lme);
+            MessageUtil.transferTo(msg, dest, message);
+            send(dest);
+        } else {
+           MessageUtil.transferTo(source, dest, message); 
+           send(dest);
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.transform.Source;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.xpath.XPathFactory;
+import javax.xml.xpath.XPathFunctionResolver;
+
+import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.expression.JAXPNodeSetXPathExpression;
+import org.apache.servicemix.expression.MessageVariableResolver;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ * The XPathSplitter component implements the 
+ * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a>
+ * pattern using an xpath expression to split the incoming xml. 
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="xpath-splitter"
+ *                  description="An XPath Splitter"
+ */
+public class XPathSplitter extends AbstractSplitter {
+
+    /**
+     * The xpath expression to use to split 
+     */
+    private JAXPNodeSetXPathExpression xpathExpression = new JAXPNodeSetXPathExpression();
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check xpath expression
+        try {
+            xpathExpression.afterPropertiesSet();
+        } catch (Exception e) {
+            throw new DeploymentException("Error validating xpath expression", e);
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.components.eip.AbstractSplitter#split(javax.xml.transform.Source)
+     */
+    protected Source[] split(Source main) throws Exception {
+        Node doc = new SourceTransformer().toDOMNode(main);
+        NodeList nodes = (NodeList) xpathExpression.evaluateXPath(doc);
+        Source[] parts = new Source[nodes.getLength()];
+        for (int i = 0; i < parts.length; i++) {
+            parts[i] = new DOMSource(nodes.item(i));
+        }
+        return parts;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getFactory()
+     */
+    public XPathFactory getFactory() {
+        return xpathExpression.getFactory();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getFunctionResolver()
+     */
+    public XPathFunctionResolver getFunctionResolver() {
+        return xpathExpression.getFunctionResolver();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getNamespaceContext()
+     */
+    public NamespaceContext getNamespaceContext() {
+        return xpathExpression.getNamespaceContext();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getTransformer()
+     */
+    public SourceTransformer getTransformer() {
+        return xpathExpression.getTransformer();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getVariableResolver()
+     */
+    public MessageVariableResolver getVariableResolver() {
+        return xpathExpression.getVariableResolver();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#getXPath()
+     */
+    public String getXPath() {
+        return xpathExpression.getXPath();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#setFactory(javax.xml.xpath.XPathFactory)
+     */
+    public void setFactory(XPathFactory factory) {
+        xpathExpression.setFactory(factory);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#setFunctionResolver(javax.xml.xpath.XPathFunctionResolver)
+     */
+    public void setFunctionResolver(XPathFunctionResolver functionResolver) {
+        xpathExpression.setFunctionResolver(functionResolver);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#setNamespaceContext(javax.xml.namespace.NamespaceContext)
+     */
+    public void setNamespaceContext(NamespaceContext namespaceContext) {
+        xpathExpression.setNamespaceContext(namespaceContext);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#setTransformer(org.apache.servicemix.jbi.jaxp.SourceTransformer)
+     */
+    public void setTransformer(SourceTransformer transformer) {
+        xpathExpression.setTransformer(transformer);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.expression.JAXPXPathExpression#setVariableResolver(org.apache.servicemix.expression.MessageVariableResolver)
+     */
+    public void setVariableResolver(MessageVariableResolver variableResolver) {
+        xpathExpression.setVariableResolver(variableResolver);
+    }
+
+    /**
+     * @org.apache.xbean.Property alias="xpath"
+     */
+    public void setXPath(String xpath) {
+        xpathExpression.setXPath(xpath);
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+import javax.xml.transform.Source;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+
+/**
+ * The AbstractSplitter is an abstract base class for Splitters.
+ * This component implements the  
+ * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a> 
+ * pattern.
+ *  
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public abstract class AbstractSplitter extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(AbstractSplitter.class);
+
+    /**
+     * The address of the target endpoint
+     */
+    private ExchangeTarget target;
+    /**
+     * Indicates if faults and errors from splitted parts should be sent
+     * back to the consumer.  In such a case, only the first fault or
+     * error received will be reported.
+     * Note that if the consumer is synchronous, it will be blocked
+     * until all parts have been successfully acked, or
+     * a fault or error is reported, and the exchange will be kept in the
+     * store for recovery. 
+     */
+    private boolean reportErrors;
+    /**
+     * Indicates if incoming attachments should be forwarded with the new exchanges.
+     */
+    private boolean forwardAttachments;
+    /**
+     * Indicates if properties on the incoming message should be forwarded.
+     */
+    private boolean forwardProperties;
+    /**
+     * The correlation property used by this component
+     */
+    private String correlation;
+    
+    /**
+     * @return Returns the reportErrors.
+     */
+    public boolean isReportErrors() {
+        return reportErrors;
+    }
+
+    /**
+     * @param reportErrors The reportErrors to set.
+     */
+    public void setReportErrors(boolean reportErrors) {
+        this.reportErrors = reportErrors;
+    }
+
+    /**
+     * @return Returns the target.
+     */
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    /**
+     * @param target The target to set.
+     */
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+
+    /**
+     * @return Returns the forwardAttachments.
+     */
+    public boolean isForwardAttachments() {
+        return forwardAttachments;
+    }
+
+    /**
+     * @param forwardAttachments The forwardAttachments to set.
+     */
+    public void setForwardAttachments(boolean forwardAttachments) {
+        this.forwardAttachments = forwardAttachments;
+    }
+
+    /**
+     * @return Returns the forwardProperties.
+     */
+    public boolean isForwardProperties() {
+        return forwardProperties;
+    }
+
+    /**
+     * @param forwardProperties The forwardProperties to set.
+     */
+    public void setForwardProperties(boolean forwardProperties) {
+        this.forwardProperties = forwardProperties;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Check target
+        if (target == null) {
+            throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+        }
+        // Create correlation property
+        correlation = "Splitter.Correlation." + getContext().getComponentName();
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws MessagingException {
+        try {
+            // If we need to report errors, the behavior is really different,
+            // as we need to keep the incoming exchange in the store until
+            // all acks have been received
+            if (reportErrors) {
+                // TODO: implement this
+                throw new UnsupportedOperationException("Not implemented");
+            // We are in a simple fire-and-forget behaviour.
+            // This implementation is really efficient as we do not use
+            // the store at all.
+            } else {
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    return;
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    return;
+                } else if (exchange instanceof InOnly == false &&
+                           exchange instanceof RobustInOnly == false) {
+                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+                } else if (exchange.getFault() != null) {
+                    done(exchange);
+                } else {
+                    MessageExchange[] parts = createParts(exchange);
+                    for (int i = 0; i < parts.length; i++) {
+                        target.configureTarget(parts[i], getContext());
+                        send(parts[i]);
+                    }
+                    done(exchange);
+                }
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+    
+    protected MessageExchange[] createParts(MessageExchange exchange) throws Exception {
+        NormalizedMessage in = MessageUtil.copyIn(exchange);
+        Source[] srcParts = split(in.getContent());
+        MessageExchange[] parts = new MessageExchange[srcParts.length];
+        for (int i = 0; i < srcParts.length; i++) {
+            parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
+        }
+        return parts;
+    }
+    
+    protected MessageExchange createPart(URI pattern,
+                                         NormalizedMessage srcMessage, 
+                                         Source content) throws Exception {
+        MessageExchange me = exchangeFactory.createExchange(pattern);
+        NormalizedMessage in = me.createMessage();
+        in.setContent(content);
+        me.setMessage(in, "in");
+        if (forwardAttachments) {
+            Set names = srcMessage.getAttachmentNames();
+            for (Iterator iter = names.iterator(); iter.hasNext();) {
+                String name = (String) iter.next();
+                in.addAttachment(name, srcMessage.getAttachment(name));
+            }
+        }
+        if (forwardProperties) {
+            Set names  = srcMessage.getPropertyNames();
+            for (Iterator iter = names.iterator(); iter.hasNext();) {
+                String name = (String) iter.next();
+                in.setProperty(name, srcMessage.getProperty(name));
+            }
+        }
+        return me;
+    }
+
+    protected abstract Source[] split(Source main) throws Exception;
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,123 @@
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * An ExchangeTarget may be used to specify the target of an exchange,
+ * while retaining all the JBI features (interface based routing, service
+ * name based routing or endpoint routing).
+ *   
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="exchange-target"
+ */
+public class ExchangeTarget implements InitializingBean {
+
+    private QName _interface;
+
+    private QName operation;
+
+    private QName service;
+
+    private String endpoint;
+
+    /**
+     * @return Returns the endpointName.
+     */
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    /**
+     * @param endpointName
+     *            The endpointName to set.
+     */
+    public void setEndpoint(String endpointName) {
+        this.endpoint = endpointName;
+    }
+
+    /**
+     * @return Returns the interface name.
+     */
+    public QName getInterface() {
+        return _interface;
+    }
+
+    /**
+     * @param interface name
+     *            The interface name to set.
+     */
+    public void setInterface(QName _interface) {
+        this._interface = _interface;
+    }
+
+    /**
+     * @return Returns the operation name.
+     */
+    public QName getOperation() {
+        return operation;
+    }
+
+    /**
+     * @param operation
+     *            The operation to set.
+     */
+    public void setOperation(QName operation) {
+        this.operation = operation;
+    }
+
+    /**
+     * @return Returns the serviceName.
+     */
+    public QName getService() {
+        return service;
+    }
+
+    /**
+     * @param serviceName
+     *            The serviceName to set.
+     */
+    public void setService(QName serviceName) {
+        this.service = serviceName;
+    }
+
+    /**
+     * Configures the target on the newly created exchange 
+     * @param exchange the exchange to configure
+     * @throws MessagingException if the target could not be configured
+     */
+    public void configureTarget(MessageExchange exchange, ComponentContext context) throws MessagingException {
+        if (_interface == null && service == null) {
+            throw new MessagingException("interfaceName or serviceName should be specified");
+        }
+        if (_interface != null) {
+            exchange.setInterfaceName(_interface);
+        }
+        if (operation != null) {
+            exchange.setOperation(operation);
+        }
+        if (service != null) {
+            exchange.setService(service);
+            if (endpoint != null) {
+                ServiceEndpoint se = context.getEndpoint(service, endpoint);
+                exchange.setEndpoint(se);
+            }
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+     */
+    public void afterPropertiesSet() throws Exception {
+        if (_interface == null && service == null) {
+            throw new MessagingException("interfaceName or serviceName should be specified");
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.security.auth.Subject;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.FileUtil;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public class MessageUtil {
+
+    public static void transfer(NormalizedMessage source, NormalizedMessage dest) throws Exception {
+        dest.setContent(source.getContent());
+        for (Iterator it = source.getPropertyNames().iterator(); it.hasNext();) {
+            String name = (String) it.next();
+            dest.setProperty(name, source.getProperty(name));
+        }
+        for (Iterator it = source.getAttachmentNames().iterator(); it.hasNext();) {
+            String name = (String) it.next();
+            dest.addAttachment(name, source.getAttachment(name));
+        }
+        dest.setSecuritySubject(source.getSecuritySubject());
+    }
+    
+    public static NormalizedMessage copy(NormalizedMessage source) throws Exception {
+        if (source instanceof Fault) {
+            return new FaultImpl((Fault) source);
+        } else {
+            return new NormalizedMessageImpl(source);
+        }
+    }
+    
+    public static NormalizedMessage copyIn(MessageExchange exchange) throws Exception {
+        return copy(exchange.getMessage("in"));
+    }
+    
+    public static NormalizedMessage copyOut(MessageExchange exchange) throws Exception {
+        return copy(exchange.getMessage("out"));
+    }
+    
+    public static Fault copyFault(MessageExchange exchange) throws Exception {
+        return (Fault) copy(exchange.getMessage("fault"));
+    }
+    
+    public static void transferInToIn(MessageExchange source, MessageExchange dest) throws Exception {
+        transferToIn(source.getMessage("in"), dest);
+    }
+    
+    public static void transferOutToIn(MessageExchange source, MessageExchange dest) throws Exception {
+        transferToIn(source.getMessage("out"), dest);
+    }
+    
+    public static void transferToIn(NormalizedMessage sourceMsg, MessageExchange dest) throws Exception {
+        transferTo(sourceMsg, dest, "in");
+    }
+    
+    public static void transferOutToOut(MessageExchange source, MessageExchange dest) throws Exception {
+        transferToOut(source.getMessage("out"), dest);
+    }
+    
+    public static void transferInToOut(MessageExchange source, MessageExchange dest) throws Exception {
+        transferToOut(source.getMessage("in"), dest);
+    }
+    
+    public static void transferToOut(NormalizedMessage sourceMsg, MessageExchange dest) throws Exception {
+        transferTo(sourceMsg, dest, "out");
+    }
+    
+    public static void transferFaultToFault(MessageExchange source, MessageExchange dest) throws Exception {
+        transferToFault(source.getFault(), dest);
+    }
+    
+    public static void transferToFault(Fault fault, MessageExchange dest) throws Exception {
+        transferTo(fault, dest, "fault");
+    }
+    
+    public static void transferTo(NormalizedMessage sourceMsg, MessageExchange dest, String name) throws Exception {
+        NormalizedMessage destMsg = (sourceMsg instanceof Fault) ? dest.createFault() : dest.createMessage();
+        transfer(sourceMsg, destMsg);
+        dest.setMessage(destMsg, name);
+    }
+    
+    public static void transferTo(MessageExchange source, MessageExchange dest, String name) throws Exception {
+        NormalizedMessage sourceMsg = source.getMessage(name);
+        NormalizedMessage destMsg = (sourceMsg instanceof Fault) ? dest.createFault() : dest.createMessage();
+        transfer(sourceMsg, destMsg);
+        dest.setMessage(destMsg, name);
+    }
+    
+    private static class NormalizedMessageImpl implements NormalizedMessage {
+
+        private Subject subject;
+        private Source content;
+        private Map properties = new HashMap();
+        private Map attachments = new HashMap();
+        
+        public NormalizedMessageImpl(NormalizedMessage message) throws Exception {
+            this.content = new StringSource(new SourceTransformer().contentToString(message));
+            for (Iterator it = message.getPropertyNames().iterator(); it.hasNext();) {
+                String name = (String) it.next();
+                this.properties.put(name, message.getProperty(name));
+            }
+            for (Iterator it = message.getAttachmentNames().iterator(); it.hasNext();) {
+                String name = (String) it.next();
+                DataHandler dh = message.getAttachment(name);
+                DataSource ds = dh.getDataSource();
+                if (ds instanceof ByteArrayDataSource == false) {
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    FileUtil.copyInputStream(ds.getInputStream(), baos);
+                    ByteArrayDataSource bads = new ByteArrayDataSource(baos.toByteArray(), ds.getContentType());
+                    bads.setName(ds.getName());
+                    dh = new DataHandler(bads);
+                }
+                this.attachments.put(name, dh);
+            }
+            this.subject = message.getSecuritySubject();
+        }
+        
+        public void addAttachment(String id, DataHandler content) throws MessagingException {
+            this.attachments.put(id, content);
+        }
+
+        public Source getContent() {
+            return content;
+        }
+
+        public DataHandler getAttachment(String id) {
+            return (DataHandler) this.attachments.get(id);
+        }
+
+        public Set getAttachmentNames() {
+            return this.attachments.keySet();
+        }
+
+        public void removeAttachment(String id) throws MessagingException {
+            this.attachments.remove(id);
+        }
+
+        public void setContent(Source content) throws MessagingException {
+            this.content = content;
+        }
+
+        public void setProperty(String name, Object value) {
+            this.properties.put(name, value);
+        }
+
+        public void setSecuritySubject(Subject subject) {
+            this.subject = subject;
+        }
+
+        public Set getPropertyNames() {
+            return this.properties.keySet();
+        }
+
+        public Object getProperty(String name) {
+            return this.properties.get(name);
+        }
+
+        public Subject getSecuritySubject() {
+            return this.subject;
+        }
+        
+    }
+    
+    private static class FaultImpl extends NormalizedMessageImpl implements Fault {
+        public FaultImpl(Fault fault) throws Exception {
+            super(fault);
+        }
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.messaging.MessageExchange;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public interface Predicate {
+
+    public boolean matches(MessageExchange exchange);
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+/**
+ * The RoutingRule interface is used by content based routers.
+ * If the rule predicate matches the MessageExchange, the
+ * target defined on the rule will be used as the destination for
+ * the given MessageExchange.
+ *  
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="routing-rule" 
+ */
+public class RoutingRule {
+    
+    private Predicate predicate;
+    private ExchangeTarget target;
+    
+    public RoutingRule() {
+    }
+    
+    public RoutingRule(Predicate predicate, ExchangeTarget target) {
+        this.predicate = predicate;
+        this.target = target;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.components.eip.support.RoutingRule#getPredicate()
+     */
+    public Predicate getPredicate() {
+        return predicate;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.components.eip.support.RoutingRule#getTarget()
+     */
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    /**
+     * @param predicate The predicate to set.
+     */
+    public void setPredicate(Predicate predicate) {
+        this.predicate = predicate;
+    }
+
+    /**
+     * @param target The target to set.
+     */
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,42 @@
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.expression.JAXPBooleanXPathExpression;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.w3c.dom.Node;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="xpath-predicate"
+ */
+public class XPathPredicate extends JAXPBooleanXPathExpression implements Predicate {
+
+    private static final Log log = LogFactory.getLog(XPathPredicate.class);
+    
+    public XPathPredicate() {
+    }
+    
+    public XPathPredicate(String xpath) throws Exception {
+        super(xpath);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.components.eip.RoutingRule#matches(javax.jbi.messaging.MessageExchange)
+     */
+    public boolean matches(MessageExchange exchange) {
+        try {
+            NormalizedMessage in = MessageUtil.copyIn(exchange);
+            Boolean match = (Boolean) evaluate(exchange, in);
+            return Boolean.TRUE.equals(match);
+        } catch (Exception e) {
+            log.warn("Could not evaluate xpath expression", e);
+            return false;
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import java.io.ByteArrayInputStream;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public abstract class AbstractEIPTest extends TestCase {
+
+    protected JBIContainer jbi;
+    protected DefaultServiceMixClient client;
+    protected ExchangeCompletedListener listener;
+
+    protected void setUp() throws Exception {
+        jbi = new JBIContainer();
+        jbi.setEmbedded(true);
+        jbi.setUseMBeanServer(false);
+        configureContainer();
+        listener = new ExchangeCompletedListener();
+        jbi.addListener(listener);
+     
+        jbi.init();
+        jbi.start();
+
+        client = new DefaultServiceMixClient(jbi);
+
+        //LogManager.getLogger(DeliveryChannel.class).setLevel(Level.OFF);
+    }
+    
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        jbi.shutDown();
+    }
+    
+    protected void configureContainer() throws Exception {
+        jbi.setFlowName("st");
+    }
+    
+    protected ExchangeTarget createServiceExchangeTarget(QName name) {
+        ExchangeTarget target = new ExchangeTarget();
+        target.setService(name);
+        return target;
+    }
+    
+    protected ReceiverComponent activateReceiver(String name) throws Exception {
+        ReceiverComponent receiver = new ReceiverComponent();
+        activateComponent(receiver, name);
+        return receiver;
+    }
+    
+    protected void activateComponent(EIPEndpoint endpoint, String name) throws Exception {
+        EIPSpringComponent eip = new EIPSpringComponent();
+        endpoint.setService(new QName(name));
+        endpoint.setEndpoint("ep");
+        eip.setEndpoints(new EIPEndpoint[] { endpoint });
+        jbi.activateComponent(eip, name);
+    }
+    
+    protected void activateComponent(ComponentSupport component, String name) throws Exception {
+        component.setService(new QName(name));
+        component.setEndpoint("ep");
+        jbi.activateComponent(component, name);
+    }
+    
+    protected static Source createSource(String msg) {
+        return new StreamSource(new ByteArrayInputStream(msg.getBytes()));
+    }
+    
+    protected static class ReturnOutComponent extends ComponentSupport implements MessageExchangeListener {
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                if (exchange.getMessage("out") == null) {
+                    NormalizedMessage out = exchange.createMessage();
+                    out.setContent(createSource("<outMsg/>"));
+                    answer(exchange, out);
+                } else if (exchange.getFault() == null) {
+                    Fault fault = exchange.createFault();
+                    fault.setContent(createSource("<fault/>"));
+                    fail(exchange, fault);
+                } else {
+                    done(exchange);
+                }
+            }
+        }
+    }
+    
+    protected static class ReturnMockComponent extends ComponentSupport implements MessageExchangeListener {
+        private String response;
+        public ReturnMockComponent(String response) {
+            this.response = response;
+        }
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                NormalizedMessage out = exchange.createMessage();
+                out.setContent(createSource(response));
+                answer(exchange, out);
+            }
+        }
+    }
+    
+    protected static class ReturnOutAndErrorComponent extends ComponentSupport implements MessageExchangeListener {
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                if (exchange.getMessage("out") == null) {
+                    NormalizedMessage out = exchange.createMessage();
+                    out.setContent(createSource("<outMsg/>"));
+                    answer(exchange, out);
+                } else {
+                    fail(exchange, new Exception("Dummy error"));
+                }
+            }
+        }
+    }
+    
+    protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, new Exception("Dummy error"));
+            }
+        }
+    }
+    
+    protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                Fault fault = exchange.createFault();
+                fault.setContent(createSource("<fault/>"));
+                fail(exchange, fault);
+            }
+        }
+    }
+    
+}