You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/12/19 12:10:03 UTC
svn commit: r488633 - in
/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src:
main/java/org/apache/servicemix/eip/patterns/Pipeline.java
test/java/org/apache/servicemix/eip/PipelineTest.java
Author: gnodet
Date: Tue Dec 19 03:10:02 2006
New Revision: 488633
URL: http://svn.apache.org/viewvc?view=rev&rev=488633
Log:
SM-775: The EIP pipeline should have another target for faults
Modified:
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?view=diff&rev=488633&r1=488632&r2=488633
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Tue Dec 19 03:10:02 2006
@@ -62,6 +62,18 @@
* The address of the target endpoint
*/
private ExchangeTarget target;
+
+ /**
+ * The addres of the endpoint to send faults to
+ */
+ private ExchangeTarget faultsTarget;
+
+ /**
+ * When the faultsTarget is not specified,
+ * faults may be sent to the target endpoint
+ * if this flag is set to <code>true</code>
+ */
+ private boolean sendFaultsToTarget = false;
/**
* The correlation property used by this component
@@ -93,6 +105,34 @@
}
/**
+ * @return the faultsTarget
+ */
+ public ExchangeTarget getFaultsTarget() {
+ return faultsTarget;
+ }
+
+ /**
+ * @param faultsTarget the faultsTarget to set
+ */
+ public void setFaultsTarget(ExchangeTarget faultsTarget) {
+ this.faultsTarget = faultsTarget;
+ }
+
+ /**
+ * @return the sendFaultsToTarget
+ */
+ public boolean isSendFaultsToTarget() {
+ return sendFaultsToTarget;
+ }
+
+ /**
+ * @param sendFaultsToTarget the sendFaultsToTarget to set
+ */
+ public void setSendFaultsToTarget(boolean sendFaultsToTarget) {
+ this.sendFaultsToTarget = sendFaultsToTarget;
+ }
+
+ /**
* @return Returns the transformer.
*/
public ExchangeTarget getTransformer() {
@@ -148,19 +188,48 @@
else if (tme.getStatus() == ExchangeStatus.ERROR) {
fail(exchange, tme.getError());
}
- // Faults must be sent back to the consumer
else if (tme.getFault() != null) {
- if (exchange instanceof InOnly) {
- // Do not use the fault has it may contain streams
- // So just transform it to a string and send an error
- String fault = new SourceTransformer().contentToString(tme.getFault());
+ // Faults must be sent to the target / faultsTarget
+ if (faultsTarget != null || sendFaultsToTarget) {
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
+ MessageUtil.transferToIn(tme.getFault(), me);
+ sendSync(me);
done(tme);
- fail(exchange, new FaultException(fault, null, null));
+ if (me.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (me.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, me.getError());
+ } else if (me.getFault() != null) {
+ if (exchange instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(me.getFault());
+ done(me);
+ fail(exchange, new FaultException(fault, null, null));
+ } else {
+ Fault fault = MessageUtil.copyFault(me);
+ MessageUtil.transferToFault(fault, exchange);
+ done(me);
+ sendSync(exchange);
+ }
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+ }
+ // Faults must be sent back to the consumer
} else {
- Fault fault = MessageUtil.copyFault(tme);
- MessageUtil.transferToFault(fault, exchange);
- done(tme);
- sendSync(exchange);
+ if (exchange instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(tme.getFault());
+ done(tme);
+ fail(exchange, new FaultException(fault, null, null));
+ } else {
+ Fault fault = MessageUtil.copyFault(tme);
+ MessageUtil.transferToFault(fault, exchange);
+ done(tme);
+ sendSync(exchange);
+ }
}
// This should not happen
} else if (tme.getOutMessage() == null) {
@@ -259,19 +328,35 @@
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
MessageExchange me = (MessageExchange) store.load(consumerId);
fail(me, exchange.getError());
- // Faults must be sent back to the consumer
} else if (exchange.getFault() != null) {
- MessageExchange me = (MessageExchange) store.load(consumerId);
- if (me instanceof InOnly) {
- // Do not use the fault has it may contain streams
- // So just transform it to a string and send an error
- String fault = new SourceTransformer().contentToString(exchange.getFault());
- fail(me, new FaultException(fault, null, null));
- done(exchange);
- } else {
+ // Faults must be sent to faultsTarget / target
+ if (faultsTarget != null || sendFaultsToTarget) {
+ // Retrieve the consumer MEP
+ URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+ if (mep == null) {
+ throw new IllegalStateException("Exchange does not carry the consumer MEP");
+ }
+ MessageExchange me = exchangeFactory.createExchange(mep);
+ (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
+ me.setProperty(correlationConsumer, consumerId);
+ me.setProperty(correlationTransformer, exchange.getExchangeId());
store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferFaultToFault(exchange, me);
+ MessageUtil.transferToIn(exchange.getFault(), me);
send(me);
+ // Faults must be sent back to the consumer
+ } else {
+ MessageExchange me = (MessageExchange) store.load(consumerId);
+ if (me instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(exchange.getFault());
+ fail(me, new FaultException(fault, null, null));
+ done(exchange);
+ } else {
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferFaultToFault(exchange, me);
+ send(me);
+ }
}
// This is the answer from the transformer
} else if (exchange.getMessage("out") != null) {
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java?view=diff&rev=488633&r1=488632&r2=488633
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java Tue Dec 19 03:10:02 2006
@@ -86,6 +86,22 @@
listener.assertExchangeCompleted();
}
+ public void testInOnlyWithTransformerFaultSentToTarget() throws Exception {
+ pipeline.setSendFaultsToTarget(true);
+ activateComponent(new ReturnFaultComponent(), "transformer");
+ ReceiverComponent target = activateReceiver("target");
+
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("pipeline"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+ target.getMessageList().assertMessagesReceived(1);
+
+ listener.assertExchangeCompleted();
+ }
+
public void testInOnlyWithTransformerError() throws Exception {
activateComponent(new ReturnErrorComponent(), "transformer");
ReceiverComponent target = activateReceiver("target");
@@ -142,6 +158,22 @@
client.done(me);
target.getMessageList().assertMessagesReceived(0);
+
+ listener.assertExchangeCompleted();
+ }
+
+ public void testRobustInOnlyWithTransformerFaultSentToTarget() throws Exception {
+ pipeline.setSendFaultsToTarget(true);
+ activateComponent(new ReturnFaultComponent(), "transformer");
+ ReceiverComponent target = activateReceiver("target");
+
+ RobustInOnly me = client.createRobustInOnlyExchange();
+ me.setService(new QName("pipeline"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+ target.getMessageList().assertMessagesReceived(1);
listener.assertExchangeCompleted();
}