You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/04/15 01:38:46 UTC
svn commit: r394227 [2/3] - in /incubator/servicemix/trunk: ./
servicemix-assembly/
servicemix-common/src/main/java/org/apache/servicemix/common/
servicemix-core/src/main/java/org/apache/servicemix/
servicemix-core/src/main/java/org/apache/servicemix/e...
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.net.URI;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+import org.apache.servicemix.jbi.FaultException;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+
+/**
+ * The Pipeline component is a bridge between an In-Only (or Robust-In-Only) MEP and
+ * an In-Out MEP.
+ * When the Pipeline receives an In-Only MEP, it will send the input in an In-Out MEP
+ * to the tranformer destination and forward the response in an In-Only MEP to the target
+ * destination.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="pipeline"
+ * description="A Pipeline"
+ */
+public class Pipeline extends EIPEndpoint {
+
+ private static final Log log = LogFactory.getLog(Pipeline.class);
+
+ private static final String TRANSFORMER = "Pipeline.Transformer";
+
+ private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
+
+ /**
+ * The adress of the in-out endpoint acting as a transformer
+ */
+ private ExchangeTarget transformer;
+
+ /**
+ * The address of the target endpoint
+ */
+ private ExchangeTarget target;
+
+ /**
+ * The correlation property used by this component
+ */
+ private String correlationConsumer;
+
+ /**
+ * The correlation property used by this component
+ */
+ private String correlationTransformer;
+
+ /**
+ * The correlation property used by this component
+ */
+ private String correlationTarget;
+
+ /**
+ * @return Returns the target.
+ */
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ /**
+ * @param target The target to set.
+ */
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+ /**
+ * @return Returns the transformer.
+ */
+ public ExchangeTarget getTransformer() {
+ return transformer;
+ }
+
+ /**
+ * @param transformer The transformer to set.
+ */
+ public void setTransformer(ExchangeTarget transformer) {
+ this.transformer = transformer;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check target
+ if (target == null) {
+ throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+ }
+ // Check transformer
+ if (transformer == null) {
+ throw new IllegalArgumentException("transformer should be set to a valid ExchangeTarget");
+ }
+ // Create correlation properties
+ correlationConsumer = "Pipeline.Consumer." + getService() + "." + getEndpoint();
+ correlationTransformer = "Pipeline.Transformer." + getService() + "." + getEndpoint();
+ correlationTarget = "Pipeline.Target." + getService() + "." + getEndpoint();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws MessagingException {
+ try {
+ // The exchange comes from the consumer
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // A DONE status from the consumer can only be received
+ // when a fault has been sent
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ String transformerId = (String) exchange.getProperty(correlationTransformer);
+ String targetId = (String) exchange.getProperty(correlationTarget);
+ if (transformerId == null && targetId == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
+ }
+ // Load the exchange
+ MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+ done(me);
+ // Errors must be sent back to the target or transformer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ String transformerId = (String) exchange.getProperty(correlationTransformer);
+ String targetId = (String) exchange.getProperty(correlationTarget);
+ if (transformerId == null && targetId == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
+ }
+ // Load the exchange
+ MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+ fail(me, exchange.getError());
+ // This is a new exchange
+ } else if (exchange.getProperty(correlationTransformer) == null) {
+ if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ return;
+ }
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createInOutExchange();
+ transformer.configureTarget(tme, getContext());
+ // Set correlations
+ exchange.setProperty(correlationTransformer, tme.getExchangeId());
+ tme.setProperty(correlationConsumer, exchange.getExchangeId());
+ tme.setProperty(TRANSFORMER, Boolean.TRUE);
+ tme.setProperty(CONSUMER_MEP, exchange.getPattern());
+ // Put exchange to store
+ store.store(exchange.getExchangeId(), exchange);
+ // Send in to listener and target
+ MessageUtil.transferInToIn(exchange, tme);
+ send(tme);
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+ }
+ // If the exchange comes from the transformer
+ } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
+ // Retrieve the correlation id
+ String consumerId = (String) exchange.getProperty(correlationConsumer);
+ if (consumerId == null) {
+ throw new IllegalStateException(correlationConsumer + " property not found");
+ }
+ // This should not happen beacause the MEP is an In-Out
+ // and the DONE status is always sent by the consumer (us)
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Received a DONE status from the transformer");
+ // Errors must be sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ MessageExchange me = (MessageExchange) store.load(consumerId);
+ fail(me, exchange.getError());
+ // Faults must be sent back to the consumer
+ } else if (exchange.getFault() != null) {
+ MessageExchange me = (MessageExchange) store.load(consumerId);
+ if (me instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(exchange.getFault());
+ fail(me, new FaultException(fault, null, null));
+ done(exchange);
+ } else {
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferFaultToFault(exchange, me);
+ send(me);
+ }
+ // This is the answer from the transformer
+ } else if (exchange.getMessage("out") != null) {
+ // Retrieve the consumer MEP
+ URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+ if (mep == null) {
+ throw new IllegalStateException("Exchange does not carry the consumer MEP");
+ }
+ MessageExchange me = exchangeFactory.createExchange(mep);
+ target.configureTarget(me, getContext());
+ me.setProperty(correlationConsumer, consumerId);
+ me.setProperty(correlationTransformer, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferOutToIn(exchange, me);
+ send(me);
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ // The exchange comes from the target
+ } else {
+ // Retrieve the correlation id for the consumer
+ String consumerId = (String) exchange.getProperty(correlationConsumer);
+ if (consumerId == null) {
+ throw new IllegalStateException(correlationConsumer + " property not found");
+ }
+ // Retrieve the correlation id for the transformer
+ String transformerId = (String) exchange.getProperty(correlationTransformer);
+ if (transformerId == null) {
+ throw new IllegalStateException(correlationTransformer + " property not found");
+ }
+ // This should be the last message received
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Need to ack the consumer
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ done(cme);
+ // Errors should be sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Send error to consumer
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ fail(cme, exchange.getError());
+ // If we have a robust-in-only MEP, we can receive a fault
+ } else if (exchange.getFault() != null) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Send fault back to consumer
+ store.store(exchange.getExchangeId(), exchange);
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ cme.setProperty(correlationTarget, exchange.getExchangeId());
+ MessageUtil.transferFaultToFault(exchange, cme);
+ send(cme);
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
+ }
+ }
+ // If an error occurs, log it and report the error back to the sender
+ // if the exchange is still ACTIVE
+ } catch (Exception e) {
+ log.error("An exception occured while processing exchange", e);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, e);
+ }
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+
+/**
+ * The StaticRecipientList component will forward an input In-Only or Robust-In-Only
+ * exchange to a list of known recipients.
+ * This component implements the
+ * <a href="http://www.enterpriseintegrationpatterns.com/RecipientList.html">Recipient List</a>
+ * pattern, with the limitation that the recipient list is static.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="static-recipient-list"
+ * description="A static Recipient List"
+ */
+public class StaticRecipientList extends EIPEndpoint {
+
+ private static final Log log = LogFactory.getLog(WireTap.class);
+
+ /**
+ * List of recipients
+ */
+ private ExchangeTarget[] recipients;
+ /**
+ * Indicates if faults and errors from recipients should be sent
+ * back to the consumer. In such a case, only the first fault or
+ * error received will be reported.
+ * Note that if the consumer is synchronous, it will be blocked
+ * until all recipients successfully acked the exchange, or
+ * a fault or error is reported, and the exchange will be kept in the
+ * store for recovery.
+ */
+ private boolean reportErrors;
+ /**
+ * The correlation property used by this component
+ */
+ private String correlation;
+
+ /**
+ * @return Returns the recipients.
+ */
+ public ExchangeTarget[] getRecipients() {
+ return recipients;
+ }
+
+ /**
+ * @param recipients The recipients to set.
+ */
+ public void setRecipients(ExchangeTarget[] recipients) {
+ this.recipients = recipients;
+ }
+
+ /**
+ * @return Returns the reportErrors.
+ */
+ public boolean isReportErrors() {
+ return reportErrors;
+ }
+
+ /**
+ * @param reportErrors The reportErrors to set.
+ */
+ public void setReportErrors(boolean reportErrors) {
+ this.reportErrors = reportErrors;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check recipients
+ if (recipients == null || recipients.length == 0) {
+ throw new IllegalArgumentException("recipients should contain at least one ExchangeTarget");
+ }
+ // Create correlation property
+ correlation = "StaticRecipientList.Correlation." + getService() + "." + getEndpoint();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws MessagingException {
+ try {
+ // If we need to report errors, the behavior is really different,
+ // as we need to keep the incoming exchange in the store until
+ // all acks have been received
+ if (reportErrors) {
+ // TODO: implement this
+ throw new UnsupportedOperationException("Not implemented");
+ // We are in a simple fire-and-forget behaviour.
+ // This implementation is really efficient as we do not use
+ // the store at all.
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ } else if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else if (exchange.getFault() != null) {
+ done(exchange);
+ } else {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ for (int i = 0; i < recipients.length; i++) {
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ recipients[i].configureTarget(me, getContext());
+ MessageUtil.transferToIn(in, me);
+ send(me);
+ }
+ done(exchange);
+ }
+ }
+ // If an error occurs, log it and report the error back to the sender
+ // if the exchange is still ACTIVE
+ } catch (Exception e) {
+ log.error("An exception occured while processing exchange", e);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, e);
+ }
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+
+/**
+ * A RoutingSlip component can be used to route an incoming In-Out exchange
+ * through a series of target services.
+ * This component implements the
+ * <a href="http://www.enterpriseintegrationpatterns.com/RoutingTable.html">Routing Slip</a>
+ * pattern, with the limitation that the routing table is static.
+ * This component only uses In-Out MEPs and errors or faults sent by targets are reported
+ * back to the consumer, thus interrupting the routing process.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="static-routing-slip"
+ * description="A static Routing Slip"
+ */
+public class StaticRoutingSlip extends EIPEndpoint {
+
+ private static final Log log = LogFactory.getLog(Pipeline.class);
+
+ /**
+ * List of target components used in the RoutingSlip
+ */
+ private ExchangeTarget[] targets;
+ /**
+ * The correlation property used by this component
+ */
+ private String correlation;
+ /**
+ * The current index of the target
+ */
+ private String index;
+ /**
+ * The id of the previous target exchange
+ */
+ private String previous;
+
+ /**
+ * @return Returns the targets.
+ */
+ public ExchangeTarget[] getTargets() {
+ return targets;
+ }
+
+ /**
+ * @param targets The targets to set.
+ */
+ public void setTargets(ExchangeTarget[] targets) {
+ this.targets = targets;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check target
+ if (targets == null || targets.length == 0) {
+ throw new IllegalArgumentException("targets should contain at least one ExchangeTarget");
+ }
+ // Create correlation properties
+ correlation = "RoutingSlip.Correlation." + getService() + "." + getEndpoint();
+ index = "RoutingSlip.Index." + getService() + "." + getEndpoint();
+ previous = "RoutingSlip.Previous." + getService() + "." + getEndpoint();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws MessagingException {
+ try {
+ // This exchange comes from the consumer
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ String correlationId = (String) exchange.getProperty(correlation);
+ if (correlationId == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ // Ack last target hit
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ done(me);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ String correlationId = (String) exchange.getProperty(correlation);
+ if (correlationId == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ // Ack last target hit
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ done(me);
+ } else if (exchange instanceof InOut == false) {
+ throw new IllegalStateException("Use an InOut MEP");
+ } else {
+ MessageExchange me = exchangeFactory.createInOutExchange();
+ me.setProperty(correlation, exchange.getExchangeId());
+ me.setProperty(index, new Integer(0));
+ targets[0].configureTarget(me, getContext());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferInToIn(exchange, me);
+ send(me);
+ }
+ // The exchange comes from a target
+ } else {
+ String correlationId = (String) exchange.getProperty(correlation);
+ String previousId = (String) exchange.getProperty(previous);
+ Integer prevIndex = (Integer) exchange.getProperty(index);
+ if (correlationId == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ if (prevIndex == null) {
+ throw new IllegalStateException(previous + " property not found");
+ }
+ // This should never happen, as we can only send DONE
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+ // ERROR are sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ fail(me, exchange.getError());
+ // Ack the previous target
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
+ }
+ // Faults are sent back to the consumer
+ } else if (exchange.getFault() != null) {
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ me.setProperty(correlation, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferFaultToFault(exchange, me);
+ send(me);
+ // Ack the previous target
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
+ }
+ // Out message, give it to next target or back to consumer
+ } else if (exchange.getMessage("out") != null) {
+ // This is the answer from the last target
+ if (prevIndex.intValue() == targets.length - 1) {
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ me.setProperty(correlation, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferOutToOut(exchange, me);
+ send(me);
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
+ }
+ // We still have a target to hit
+ } else {
+ MessageExchange me = exchangeFactory.createInOutExchange();
+ Integer curIndex = new Integer(prevIndex.intValue() + 1);
+ me.setProperty(correlation, correlationId);
+ me.setProperty(index, curIndex);
+ me.setProperty(previous, exchange.getExchangeId());
+ targets[curIndex.intValue()].configureTarget(me, getContext());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferOutToIn(exchange, me);
+ send(me);
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
+ }
+ }
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ }
+ // If an error occurs, log it and report the error back to the sender
+ // if the exchange is still ACTIVE
+ } catch (Exception e) {
+ log.error("An exception occured while processing exchange", e);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, e);
+ }
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.eip.support.MessageUtil;
+import org.apache.servicemix.store.Store;
+
+/**
+ *
+ * A WireTap component can be used to forward a copy of the input message to a listener.
+ * This component implements the
+ * <a href="http://www.enterpriseintegrationpatterns.com/WireTap.html">WireTap</a>
+ * pattern.
+ * It can handle all 4 standard MEPs, but will only send an In-Only MEP to the listener.
+ * In addition, this component is fully asynchronous and uses an exchange store to provide
+ * full HA and recovery for clustered / persistent flows.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="wire-tap"
+ * description="A WireTap"
+ */
+public class WireTap extends EIPEndpoint {
+
+ private static final Log log = LogFactory.getLog(WireTap.class);
+
+ /**
+ * The main target destination which will receive the exchange
+ */
+ private ExchangeTarget target;
+ /**
+ * The listener destination for in messages
+ */
+ private ExchangeTarget inListener;
+ /**
+ * The listener destination for out messages
+ */
+ private ExchangeTarget outListener;
+ /**
+ * The listener destination for fault messages
+ */
+ private ExchangeTarget faultListener;
+ /**
+ * The correlation property used by this component
+ */
+ private String correlation;
+
+ /**
+ * @return Returns the target.
+ */
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ /**
+ * @param target The target to set.
+ */
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+ /**
+ * @return Returns the faultListener.
+ */
+ public ExchangeTarget getFaultListener() {
+ return faultListener;
+ }
+
+ /**
+ * @param faultListener The faultListener to set.
+ */
+ public void setFaultListener(ExchangeTarget faultListener) {
+ this.faultListener = faultListener;
+ }
+
+ /**
+ * @return Returns the inListener.
+ */
+ public ExchangeTarget getInListener() {
+ return inListener;
+ }
+
+ /**
+ * @param inListener The inListener to set.
+ */
+ public void setInListener(ExchangeTarget inListener) {
+ this.inListener = inListener;
+ }
+
+ /**
+ * @return Returns the outListener.
+ */
+ public ExchangeTarget getOutListener() {
+ return outListener;
+ }
+
+ /**
+ * @param outListener The outListener to set.
+ */
+ public void setOutListener(ExchangeTarget outListener) {
+ this.outListener = outListener;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check target
+ if (target == null) {
+ throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+ }
+ // Create correlation property
+ correlation = "WireTap.Correlation." + getService() + "." + getEndpoint();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws MessagingException {
+ try {
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+ exchange.getProperty(correlation) == null) {
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+ if (store.hasFeature(Store.CLUSTERED)) {
+ exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+ tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ }
+ target.configureTarget(tme, getContext());
+ // Set correlations
+ exchange.setProperty(correlation, tme.getExchangeId());
+ tme.setProperty(correlation, exchange.getExchangeId());
+ // Put exchange to store
+ store.store(exchange.getExchangeId(), exchange);
+ // Send in to listener and target
+ sendToTargetAndListener(exchange, tme, inListener, "in");
+ // Mimic the exchange on the other side and send to needed listener
+ } else {
+ String id = (String) exchange.getProperty(correlation);
+ if (id == null) {
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
+ exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ // This must be a listener status, so ignore
+ return;
+ }
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ MessageExchange org = (MessageExchange) store.load(id);
+ if (org == null) {
+ throw new IllegalStateException("Could not load original exchange with id " + id);
+ }
+ // Reproduce DONE status to the other side
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(org);
+ // Reproduce ERROR status to the other side
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(org, exchange.getError());
+ // Reproduce faults to the other side and listeners
+ } else if (exchange.getFault() != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ sendToTargetAndListener(exchange, org, faultListener, "fault");
+ // Reproduce answers to the other side
+ } else if (exchange.getMessage("out") != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ sendToTargetAndListener(exchange, org, outListener, "out");
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ }
+ // If an error occurs, log it and report the error back to the sender
+ // if the exchange is still ACTIVE
+ } catch (Exception e) {
+ log.error("An exception occured while processing exchange", e);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, e);
+ }
+ }
+ }
+
+ private void sendToTargetAndListener(MessageExchange source,
+ MessageExchange dest,
+ ExchangeTarget listener,
+ String message) throws Exception {
+ if (listener != null) {
+ NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
+ InOnly lme = exchangeFactory.createInOnlyExchange();
+ if (store.hasFeature(Store.CLUSTERED)) {
+ lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ }
+ listener.configureTarget(lme, getContext());
+ MessageUtil.transferToIn(msg, lme);
+ send(lme);
+ MessageUtil.transferTo(msg, dest, message);
+ send(dest);
+ } else {
+ MessageUtil.transferTo(source, dest, message);
+ send(dest);
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/XPathSplitter.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import javax.jbi.management.DeploymentException;
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.transform.Source;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.xpath.XPathFactory;
+import javax.xml.xpath.XPathFunctionResolver;
+
+import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.expression.JAXPNodeSetXPathExpression;
+import org.apache.servicemix.expression.MessageVariableResolver;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ * The XPathSplitter component implements the
+ * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a>
+ * pattern using an xpath expression to split the incoming xml.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="xpath-splitter"
+ * description="An XPath Splitter"
+ */
+public class XPathSplitter extends AbstractSplitter {
+
+ /**
+ * The xpath expression to use to split
+ */
+ private JAXPNodeSetXPathExpression xpathExpression = new JAXPNodeSetXPathExpression();
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check xpath expression
+ try {
+ xpathExpression.afterPropertiesSet();
+ } catch (Exception e) {
+ throw new DeploymentException("Error validating xpath expression", e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.components.eip.AbstractSplitter#split(javax.xml.transform.Source)
+ */
+ protected Source[] split(Source main) throws Exception {
+ Node doc = new SourceTransformer().toDOMNode(main);
+ NodeList nodes = (NodeList) xpathExpression.evaluateXPath(doc);
+ Source[] parts = new Source[nodes.getLength()];
+ for (int i = 0; i < parts.length; i++) {
+ parts[i] = new DOMSource(nodes.item(i));
+ }
+ return parts;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getFactory()
+ */
+ public XPathFactory getFactory() {
+ return xpathExpression.getFactory();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getFunctionResolver()
+ */
+ public XPathFunctionResolver getFunctionResolver() {
+ return xpathExpression.getFunctionResolver();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getNamespaceContext()
+ */
+ public NamespaceContext getNamespaceContext() {
+ return xpathExpression.getNamespaceContext();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getTransformer()
+ */
+ public SourceTransformer getTransformer() {
+ return xpathExpression.getTransformer();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getVariableResolver()
+ */
+ public MessageVariableResolver getVariableResolver() {
+ return xpathExpression.getVariableResolver();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#getXPath()
+ */
+ public String getXPath() {
+ return xpathExpression.getXPath();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#setFactory(javax.xml.xpath.XPathFactory)
+ */
+ public void setFactory(XPathFactory factory) {
+ xpathExpression.setFactory(factory);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#setFunctionResolver(javax.xml.xpath.XPathFunctionResolver)
+ */
+ public void setFunctionResolver(XPathFunctionResolver functionResolver) {
+ xpathExpression.setFunctionResolver(functionResolver);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#setNamespaceContext(javax.xml.namespace.NamespaceContext)
+ */
+ public void setNamespaceContext(NamespaceContext namespaceContext) {
+ xpathExpression.setNamespaceContext(namespaceContext);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#setTransformer(org.apache.servicemix.jbi.jaxp.SourceTransformer)
+ */
+ public void setTransformer(SourceTransformer transformer) {
+ xpathExpression.setTransformer(transformer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.expression.JAXPXPathExpression#setVariableResolver(org.apache.servicemix.expression.MessageVariableResolver)
+ */
+ public void setVariableResolver(MessageVariableResolver variableResolver) {
+ xpathExpression.setVariableResolver(variableResolver);
+ }
+
+ /**
+ * @org.apache.xbean.Property alias="xpath"
+ */
+ public void setXPath(String xpath) {
+ xpathExpression.setXPath(xpath);
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+import javax.xml.transform.Source;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+
+/**
+ * The AbstractSplitter is an abstract base class for Splitters.
+ * This component implements the
+ * <a href="http://www.enterpriseintegrationpatterns.com/Sequencer.html">Splitter</a>
+ * pattern.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public abstract class AbstractSplitter extends EIPEndpoint {
+
+ private static final Log log = LogFactory.getLog(AbstractSplitter.class);
+
+ /**
+ * The address of the target endpoint
+ */
+ private ExchangeTarget target;
+ /**
+ * Indicates if faults and errors from splitted parts should be sent
+ * back to the consumer. In such a case, only the first fault or
+ * error received will be reported.
+ * Note that if the consumer is synchronous, it will be blocked
+ * until all parts have been successfully acked, or
+ * a fault or error is reported, and the exchange will be kept in the
+ * store for recovery.
+ */
+ private boolean reportErrors;
+ /**
+ * Indicates if incoming attachments should be forwarded with the new exchanges.
+ */
+ private boolean forwardAttachments;
+ /**
+ * Indicates if properties on the incoming message should be forwarded.
+ */
+ private boolean forwardProperties;
+ /**
+ * The correlation property used by this component
+ */
+ private String correlation;
+
+ /**
+ * @return Returns the reportErrors.
+ */
+ public boolean isReportErrors() {
+ return reportErrors;
+ }
+
+ /**
+ * @param reportErrors The reportErrors to set.
+ */
+ public void setReportErrors(boolean reportErrors) {
+ this.reportErrors = reportErrors;
+ }
+
+ /**
+ * @return Returns the target.
+ */
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ /**
+ * @param target The target to set.
+ */
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+ /**
+ * @return Returns the forwardAttachments.
+ */
+ public boolean isForwardAttachments() {
+ return forwardAttachments;
+ }
+
+ /**
+ * @param forwardAttachments The forwardAttachments to set.
+ */
+ public void setForwardAttachments(boolean forwardAttachments) {
+ this.forwardAttachments = forwardAttachments;
+ }
+
+ /**
+ * @return Returns the forwardProperties.
+ */
+ public boolean isForwardProperties() {
+ return forwardProperties;
+ }
+
+ /**
+ * @param forwardProperties The forwardProperties to set.
+ */
+ public void setForwardProperties(boolean forwardProperties) {
+ this.forwardProperties = forwardProperties;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+ */
+ public void validate() throws DeploymentException {
+ super.validate();
+ // Check target
+ if (target == null) {
+ throw new IllegalArgumentException("target should be set to a valid ExchangeTarget");
+ }
+ // Create correlation property
+ correlation = "Splitter.Correlation." + getContext().getComponentName();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws MessagingException {
+ try {
+ // If we need to report errors, the behavior is really different,
+ // as we need to keep the incoming exchange in the store until
+ // all acks have been received
+ if (reportErrors) {
+ // TODO: implement this
+ throw new UnsupportedOperationException("Not implemented");
+ // We are in a simple fire-and-forget behaviour.
+ // This implementation is really efficient as we do not use
+ // the store at all.
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ } else if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else if (exchange.getFault() != null) {
+ done(exchange);
+ } else {
+ MessageExchange[] parts = createParts(exchange);
+ for (int i = 0; i < parts.length; i++) {
+ target.configureTarget(parts[i], getContext());
+ send(parts[i]);
+ }
+ done(exchange);
+ }
+ }
+ // If an error occurs, log it and report the error back to the sender
+ // if the exchange is still ACTIVE
+ } catch (Exception e) {
+ log.error("An exception occured while processing exchange", e);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, e);
+ }
+ }
+ }
+
+ protected MessageExchange[] createParts(MessageExchange exchange) throws Exception {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ Source[] srcParts = split(in.getContent());
+ MessageExchange[] parts = new MessageExchange[srcParts.length];
+ for (int i = 0; i < srcParts.length; i++) {
+ parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
+ }
+ return parts;
+ }
+
+ protected MessageExchange createPart(URI pattern,
+ NormalizedMessage srcMessage,
+ Source content) throws Exception {
+ MessageExchange me = exchangeFactory.createExchange(pattern);
+ NormalizedMessage in = me.createMessage();
+ in.setContent(content);
+ me.setMessage(in, "in");
+ if (forwardAttachments) {
+ Set names = srcMessage.getAttachmentNames();
+ for (Iterator iter = names.iterator(); iter.hasNext();) {
+ String name = (String) iter.next();
+ in.addAttachment(name, srcMessage.getAttachment(name));
+ }
+ }
+ if (forwardProperties) {
+ Set names = srcMessage.getPropertyNames();
+ for (Iterator iter = names.iterator(); iter.hasNext();) {
+ String name = (String) iter.next();
+ in.setProperty(name, srcMessage.getProperty(name));
+ }
+ }
+ return me;
+ }
+
+ protected abstract Source[] split(Source main) throws Exception;
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/ExchangeTarget.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,123 @@
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * An ExchangeTarget may be used to specify the target of an exchange,
+ * while retaining all the JBI features (interface based routing, service
+ * name based routing or endpoint routing).
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="exchange-target"
+ */
+public class ExchangeTarget implements InitializingBean {
+
+ private QName _interface;
+
+ private QName operation;
+
+ private QName service;
+
+ private String endpoint;
+
+ /**
+ * @return Returns the endpointName.
+ */
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ /**
+ * @param endpointName
+ * The endpointName to set.
+ */
+ public void setEndpoint(String endpointName) {
+ this.endpoint = endpointName;
+ }
+
+ /**
+ * @return Returns the interface name.
+ */
+ public QName getInterface() {
+ return _interface;
+ }
+
+ /**
+ * @param interface name
+ * The interface name to set.
+ */
+ public void setInterface(QName _interface) {
+ this._interface = _interface;
+ }
+
+ /**
+ * @return Returns the operation name.
+ */
+ public QName getOperation() {
+ return operation;
+ }
+
+ /**
+ * @param operation
+ * The operation to set.
+ */
+ public void setOperation(QName operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * @return Returns the serviceName.
+ */
+ public QName getService() {
+ return service;
+ }
+
+ /**
+ * @param serviceName
+ * The serviceName to set.
+ */
+ public void setService(QName serviceName) {
+ this.service = serviceName;
+ }
+
+ /**
+ * Configures the target on the newly created exchange
+ * @param exchange the exchange to configure
+ * @throws MessagingException if the target could not be configured
+ */
+ public void configureTarget(MessageExchange exchange, ComponentContext context) throws MessagingException {
+ if (_interface == null && service == null) {
+ throw new MessagingException("interfaceName or serviceName should be specified");
+ }
+ if (_interface != null) {
+ exchange.setInterfaceName(_interface);
+ }
+ if (operation != null) {
+ exchange.setOperation(operation);
+ }
+ if (service != null) {
+ exchange.setService(service);
+ if (endpoint != null) {
+ ServiceEndpoint se = context.getEndpoint(service, endpoint);
+ exchange.setEndpoint(se);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+ */
+ public void afterPropertiesSet() throws Exception {
+ if (_interface == null && service == null) {
+ throw new MessagingException("interfaceName or serviceName should be specified");
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.security.auth.Subject;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.FileUtil;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public class MessageUtil {
+
+ public static void transfer(NormalizedMessage source, NormalizedMessage dest) throws Exception {
+ dest.setContent(source.getContent());
+ for (Iterator it = source.getPropertyNames().iterator(); it.hasNext();) {
+ String name = (String) it.next();
+ dest.setProperty(name, source.getProperty(name));
+ }
+ for (Iterator it = source.getAttachmentNames().iterator(); it.hasNext();) {
+ String name = (String) it.next();
+ dest.addAttachment(name, source.getAttachment(name));
+ }
+ dest.setSecuritySubject(source.getSecuritySubject());
+ }
+
+ public static NormalizedMessage copy(NormalizedMessage source) throws Exception {
+ if (source instanceof Fault) {
+ return new FaultImpl((Fault) source);
+ } else {
+ return new NormalizedMessageImpl(source);
+ }
+ }
+
+ public static NormalizedMessage copyIn(MessageExchange exchange) throws Exception {
+ return copy(exchange.getMessage("in"));
+ }
+
+ public static NormalizedMessage copyOut(MessageExchange exchange) throws Exception {
+ return copy(exchange.getMessage("out"));
+ }
+
+ public static Fault copyFault(MessageExchange exchange) throws Exception {
+ return (Fault) copy(exchange.getMessage("fault"));
+ }
+
+ public static void transferInToIn(MessageExchange source, MessageExchange dest) throws Exception {
+ transferToIn(source.getMessage("in"), dest);
+ }
+
+ public static void transferOutToIn(MessageExchange source, MessageExchange dest) throws Exception {
+ transferToIn(source.getMessage("out"), dest);
+ }
+
+ public static void transferToIn(NormalizedMessage sourceMsg, MessageExchange dest) throws Exception {
+ transferTo(sourceMsg, dest, "in");
+ }
+
+ public static void transferOutToOut(MessageExchange source, MessageExchange dest) throws Exception {
+ transferToOut(source.getMessage("out"), dest);
+ }
+
+ public static void transferInToOut(MessageExchange source, MessageExchange dest) throws Exception {
+ transferToOut(source.getMessage("in"), dest);
+ }
+
+ public static void transferToOut(NormalizedMessage sourceMsg, MessageExchange dest) throws Exception {
+ transferTo(sourceMsg, dest, "out");
+ }
+
+ public static void transferFaultToFault(MessageExchange source, MessageExchange dest) throws Exception {
+ transferToFault(source.getFault(), dest);
+ }
+
+ public static void transferToFault(Fault fault, MessageExchange dest) throws Exception {
+ transferTo(fault, dest, "fault");
+ }
+
+ public static void transferTo(NormalizedMessage sourceMsg, MessageExchange dest, String name) throws Exception {
+ NormalizedMessage destMsg = (sourceMsg instanceof Fault) ? dest.createFault() : dest.createMessage();
+ transfer(sourceMsg, destMsg);
+ dest.setMessage(destMsg, name);
+ }
+
+ public static void transferTo(MessageExchange source, MessageExchange dest, String name) throws Exception {
+ NormalizedMessage sourceMsg = source.getMessage(name);
+ NormalizedMessage destMsg = (sourceMsg instanceof Fault) ? dest.createFault() : dest.createMessage();
+ transfer(sourceMsg, destMsg);
+ dest.setMessage(destMsg, name);
+ }
+
+ private static class NormalizedMessageImpl implements NormalizedMessage {
+
+ private Subject subject;
+ private Source content;
+ private Map properties = new HashMap();
+ private Map attachments = new HashMap();
+
+ public NormalizedMessageImpl(NormalizedMessage message) throws Exception {
+ this.content = new StringSource(new SourceTransformer().contentToString(message));
+ for (Iterator it = message.getPropertyNames().iterator(); it.hasNext();) {
+ String name = (String) it.next();
+ this.properties.put(name, message.getProperty(name));
+ }
+ for (Iterator it = message.getAttachmentNames().iterator(); it.hasNext();) {
+ String name = (String) it.next();
+ DataHandler dh = message.getAttachment(name);
+ DataSource ds = dh.getDataSource();
+ if (ds instanceof ByteArrayDataSource == false) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileUtil.copyInputStream(ds.getInputStream(), baos);
+ ByteArrayDataSource bads = new ByteArrayDataSource(baos.toByteArray(), ds.getContentType());
+ bads.setName(ds.getName());
+ dh = new DataHandler(bads);
+ }
+ this.attachments.put(name, dh);
+ }
+ this.subject = message.getSecuritySubject();
+ }
+
+ public void addAttachment(String id, DataHandler content) throws MessagingException {
+ this.attachments.put(id, content);
+ }
+
+ public Source getContent() {
+ return content;
+ }
+
+ public DataHandler getAttachment(String id) {
+ return (DataHandler) this.attachments.get(id);
+ }
+
+ public Set getAttachmentNames() {
+ return this.attachments.keySet();
+ }
+
+ public void removeAttachment(String id) throws MessagingException {
+ this.attachments.remove(id);
+ }
+
+ public void setContent(Source content) throws MessagingException {
+ this.content = content;
+ }
+
+ public void setProperty(String name, Object value) {
+ this.properties.put(name, value);
+ }
+
+ public void setSecuritySubject(Subject subject) {
+ this.subject = subject;
+ }
+
+ public Set getPropertyNames() {
+ return this.properties.keySet();
+ }
+
+ public Object getProperty(String name) {
+ return this.properties.get(name);
+ }
+
+ public Subject getSecuritySubject() {
+ return this.subject;
+ }
+
+ }
+
+ private static class FaultImpl extends NormalizedMessageImpl implements Fault {
+ public FaultImpl(Fault fault) throws Exception {
+ super(fault);
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/Predicate.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.messaging.MessageExchange;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public interface Predicate {
+
+ public boolean matches(MessageExchange exchange);
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/RoutingRule.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+/**
+ * The RoutingRule interface is used by content based routers.
+ * If the rule predicate matches the MessageExchange, the
+ * target defined on the rule will be used as the destination for
+ * the given MessageExchange.
+ *
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="routing-rule"
+ */
+public class RoutingRule {
+
+ private Predicate predicate;
+ private ExchangeTarget target;
+
+ public RoutingRule() {
+ }
+
+ public RoutingRule(Predicate predicate, ExchangeTarget target) {
+ this.predicate = predicate;
+ this.target = target;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.components.eip.support.RoutingRule#getPredicate()
+ */
+ public Predicate getPredicate() {
+ return predicate;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.components.eip.support.RoutingRule#getTarget()
+ */
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ /**
+ * @param predicate The predicate to set.
+ */
+ public void setPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ }
+
+ /**
+ * @param target The target to set.
+ */
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/XPathPredicate.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,42 @@
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.expression.JAXPBooleanXPathExpression;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.w3c.dom.Node;
+
+/**
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="xpath-predicate"
+ */
+public class XPathPredicate extends JAXPBooleanXPathExpression implements Predicate {
+
+ private static final Log log = LogFactory.getLog(XPathPredicate.class);
+
+ public XPathPredicate() {
+ }
+
+ public XPathPredicate(String xpath) throws Exception {
+ super(xpath);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.components.eip.RoutingRule#matches(javax.jbi.messaging.MessageExchange)
+ */
+ public boolean matches(MessageExchange exchange) {
+ try {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ Boolean match = (Boolean) evaluate(exchange, in);
+ return Boolean.TRUE.equals(match);
+ } catch (Exception e) {
+ log.warn("Could not evaluate xpath expression", e);
+ return false;
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java?rev=394227&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java Fri Apr 14 16:38:22 2006
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import java.io.ByteArrayInputStream;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public abstract class AbstractEIPTest extends TestCase {
+
+ protected JBIContainer jbi;
+ protected DefaultServiceMixClient client;
+ protected ExchangeCompletedListener listener;
+
+ protected void setUp() throws Exception {
+ jbi = new JBIContainer();
+ jbi.setEmbedded(true);
+ jbi.setUseMBeanServer(false);
+ configureContainer();
+ listener = new ExchangeCompletedListener();
+ jbi.addListener(listener);
+
+ jbi.init();
+ jbi.start();
+
+ client = new DefaultServiceMixClient(jbi);
+
+ //LogManager.getLogger(DeliveryChannel.class).setLevel(Level.OFF);
+ }
+
+ protected void tearDown() throws Exception {
+ listener.assertExchangeCompleted();
+ jbi.shutDown();
+ }
+
+ protected void configureContainer() throws Exception {
+ jbi.setFlowName("st");
+ }
+
+ protected ExchangeTarget createServiceExchangeTarget(QName name) {
+ ExchangeTarget target = new ExchangeTarget();
+ target.setService(name);
+ return target;
+ }
+
+ protected ReceiverComponent activateReceiver(String name) throws Exception {
+ ReceiverComponent receiver = new ReceiverComponent();
+ activateComponent(receiver, name);
+ return receiver;
+ }
+
+ protected void activateComponent(EIPEndpoint endpoint, String name) throws Exception {
+ EIPSpringComponent eip = new EIPSpringComponent();
+ endpoint.setService(new QName(name));
+ endpoint.setEndpoint("ep");
+ eip.setEndpoints(new EIPEndpoint[] { endpoint });
+ jbi.activateComponent(eip, name);
+ }
+
+ protected void activateComponent(ComponentSupport component, String name) throws Exception {
+ component.setService(new QName(name));
+ component.setEndpoint("ep");
+ jbi.activateComponent(component, name);
+ }
+
+ protected static Source createSource(String msg) {
+ return new StreamSource(new ByteArrayInputStream(msg.getBytes()));
+ }
+
+ protected static class ReturnOutComponent extends ComponentSupport implements MessageExchangeListener {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (exchange.getMessage("out") == null) {
+ NormalizedMessage out = exchange.createMessage();
+ out.setContent(createSource("<outMsg/>"));
+ answer(exchange, out);
+ } else if (exchange.getFault() == null) {
+ Fault fault = exchange.createFault();
+ fault.setContent(createSource("<fault/>"));
+ fail(exchange, fault);
+ } else {
+ done(exchange);
+ }
+ }
+ }
+ }
+
+ protected static class ReturnMockComponent extends ComponentSupport implements MessageExchangeListener {
+ private String response;
+ public ReturnMockComponent(String response) {
+ this.response = response;
+ }
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ NormalizedMessage out = exchange.createMessage();
+ out.setContent(createSource(response));
+ answer(exchange, out);
+ }
+ }
+ }
+
+ protected static class ReturnOutAndErrorComponent extends ComponentSupport implements MessageExchangeListener {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (exchange.getMessage("out") == null) {
+ NormalizedMessage out = exchange.createMessage();
+ out.setContent(createSource("<outMsg/>"));
+ answer(exchange, out);
+ } else {
+ fail(exchange, new Exception("Dummy error"));
+ }
+ }
+ }
+ }
+
+ protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, new Exception("Dummy error"));
+ }
+ }
+ }
+
+ protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ Fault fault = exchange.createFault();
+ fault.setContent(createSource("<fault/>"));
+ fail(exchange, fault);
+ }
+ }
+ }
+
+}