You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/12/19 12:10:03 UTC

svn commit: r488633 - in /incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src: main/java/org/apache/servicemix/eip/patterns/Pipeline.java test/java/org/apache/servicemix/eip/PipelineTest.java

Author: gnodet
Date: Tue Dec 19 03:10:02 2006
New Revision: 488633

URL: http://svn.apache.org/viewvc?view=rev&rev=488633
Log:
SM-775: The EIP pipeline should have another target for faults

Modified:
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java

Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?view=diff&rev=488633&r1=488632&r2=488633
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Tue Dec 19 03:10:02 2006
@@ -62,6 +62,18 @@
      * The address of the target endpoint
      */
     private ExchangeTarget target;
+    
+    /**
+     * The addres of the endpoint to send faults to
+     */
+    private ExchangeTarget faultsTarget;
+    
+    /**
+     * When the faultsTarget is not specified,
+     * faults may be sent to the target endpoint
+     * if this flag is set to <code>true</code>
+     */
+    private boolean sendFaultsToTarget = false;
 
     /**
      * The correlation property used by this component
@@ -93,6 +105,34 @@
     }
 
     /**
+     * @return the faultsTarget
+     */
+    public ExchangeTarget getFaultsTarget() {
+        return faultsTarget;
+    }
+
+    /**
+     * @param faultsTarget the faultsTarget to set
+     */
+    public void setFaultsTarget(ExchangeTarget faultsTarget) {
+        this.faultsTarget = faultsTarget;
+    }
+
+    /**
+     * @return the sendFaultsToTarget
+     */
+    public boolean isSendFaultsToTarget() {
+        return sendFaultsToTarget;
+    }
+
+    /**
+     * @param sendFaultsToTarget the sendFaultsToTarget to set
+     */
+    public void setSendFaultsToTarget(boolean sendFaultsToTarget) {
+        this.sendFaultsToTarget = sendFaultsToTarget;
+    }
+
+    /**
      * @return Returns the transformer.
      */
     public ExchangeTarget getTransformer() {
@@ -148,19 +188,48 @@
         else if (tme.getStatus() == ExchangeStatus.ERROR) {
             fail(exchange, tme.getError());
         }
-        // Faults must be sent back to the consumer
         else if (tme.getFault() != null) {
-            if (exchange instanceof InOnly) {
-                // Do not use the fault has it may contain streams
-                // So just transform it to a string and send an error
-                String fault = new SourceTransformer().contentToString(tme.getFault());
+            // Faults must be sent to the target / faultsTarget
+            if (faultsTarget != null || sendFaultsToTarget) {
+                MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+                (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
+                MessageUtil.transferToIn(tme.getFault(), me);
+                sendSync(me);
                 done(tme);
-                fail(exchange, new FaultException(fault, null, null));
+                if (me.getStatus() == ExchangeStatus.DONE) {
+                    done(exchange);
+                } else if (me.getStatus() == ExchangeStatus.ERROR) {
+                    fail(exchange, me.getError());
+                } else if (me.getFault() != null) {
+                    if (exchange instanceof InOnly) {
+                        // Do not use the fault has it may contain streams
+                        // So just transform it to a string and send an error
+                        String fault = new SourceTransformer().contentToString(me.getFault());
+                        done(me);
+                        fail(exchange, new FaultException(fault, null, null));
+                    } else {
+                        Fault fault = MessageUtil.copyFault(me);
+                        MessageUtil.transferToFault(fault, exchange);
+                        done(me);
+                        sendSync(exchange);
+                    }
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+                }
+            // Faults must be sent back to the consumer
             } else {
-                Fault fault = MessageUtil.copyFault(tme);
-                MessageUtil.transferToFault(fault, exchange);
-                done(tme);
-                sendSync(exchange);
+                if (exchange instanceof InOnly) {
+                    // Do not use the fault has it may contain streams
+                    // So just transform it to a string and send an error
+                    String fault = new SourceTransformer().contentToString(tme.getFault());
+                    done(tme);
+                    fail(exchange, new FaultException(fault, null, null));
+                } else {
+                    Fault fault = MessageUtil.copyFault(tme);
+                    MessageUtil.transferToFault(fault, exchange);
+                    done(tme);
+                    sendSync(exchange);
+                }
             }
         // This should not happen
         } else if (tme.getOutMessage() == null) {
@@ -259,19 +328,35 @@
             } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                 MessageExchange me = (MessageExchange) store.load(consumerId);
                 fail(me, exchange.getError());
-            // Faults must be sent back to the consumer
             } else if (exchange.getFault() != null) {
-                MessageExchange me = (MessageExchange) store.load(consumerId);
-                if (me instanceof InOnly) {
-                    // Do not use the fault has it may contain streams
-                    // So just transform it to a string and send an error
-                    String fault = new SourceTransformer().contentToString(exchange.getFault());
-                    fail(me, new FaultException(fault, null, null));
-                    done(exchange);
-                } else {
+                // Faults must be sent to faultsTarget / target
+                if (faultsTarget != null || sendFaultsToTarget) {
+                    // Retrieve the consumer MEP
+                    URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+                    if (mep == null) {
+                        throw new IllegalStateException("Exchange does not carry the consumer MEP");
+                    }
+                    MessageExchange me = exchangeFactory.createExchange(mep);
+                    (faultsTarget != null ? faultsTarget : target).configureTarget(me, getContext());
+                    me.setProperty(correlationConsumer, consumerId);
+                    me.setProperty(correlationTransformer, exchange.getExchangeId());
                     store.store(exchange.getExchangeId(), exchange);
-                    MessageUtil.transferFaultToFault(exchange, me);
+                    MessageUtil.transferToIn(exchange.getFault(), me);
                     send(me);
+                // Faults must be sent back to the consumer
+                } else {
+                    MessageExchange me = (MessageExchange) store.load(consumerId);
+                    if (me instanceof InOnly) {
+                        // Do not use the fault has it may contain streams
+                        // So just transform it to a string and send an error
+                        String fault = new SourceTransformer().contentToString(exchange.getFault());
+                        fail(me, new FaultException(fault, null, null));
+                        done(exchange);
+                    } else {
+                        store.store(exchange.getExchangeId(), exchange);
+                        MessageUtil.transferFaultToFault(exchange, me);
+                        send(me);
+                    }
                 }
             // This is the answer from the transformer
             } else if (exchange.getMessage("out") != null) {

Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java?view=diff&rev=488633&r1=488632&r2=488633
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTest.java Tue Dec 19 03:10:02 2006
@@ -86,6 +86,22 @@
         listener.assertExchangeCompleted();
     }
     
+    public void testInOnlyWithTransformerFaultSentToTarget() throws Exception {
+        pipeline.setSendFaultsToTarget(true);
+        activateComponent(new ReturnFaultComponent(), "transformer");
+        ReceiverComponent target = activateReceiver("target");
+
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("pipeline"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        target.getMessageList().assertMessagesReceived(1);
+        
+        listener.assertExchangeCompleted();
+    }
+    
     public void testInOnlyWithTransformerError() throws Exception {
         activateComponent(new ReturnErrorComponent(), "transformer");
         ReceiverComponent target = activateReceiver("target");
@@ -142,6 +158,22 @@
         client.done(me);
         
         target.getMessageList().assertMessagesReceived(0);
+        
+        listener.assertExchangeCompleted();
+    }
+    
+    public void testRobustInOnlyWithTransformerFaultSentToTarget() throws Exception {
+        pipeline.setSendFaultsToTarget(true);
+        activateComponent(new ReturnFaultComponent(), "transformer");
+        ReceiverComponent target = activateReceiver("target");
+
+        RobustInOnly me = client.createRobustInOnlyExchange();
+        me.setService(new QName("pipeline"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        target.getMessageList().assertMessagesReceived(1);
         
         listener.assertExchangeCompleted();
     }