You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by jb...@apache.org on 2011/01/31 09:11:55 UTC

svn commit: r1065506 - /servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java

Author: jbonofre
Date: Mon Jan 31 08:11:55 2011
New Revision: 1065506

URL: http://svn.apache.org/viewvc?rev=1065506&view=rev
Log:
[SMXCOMP-768] Improve async behavior in MessageFilter patterns of the ServiceMix EIP component.

Modified:
    servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java

Modified: servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java?rev=1065506&r1=1065505&r2=1065506&view=diff
==============================================================================
--- servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java (original)
+++ servicemix/components/trunk/engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java Mon Jan 31 08:11:55 2011
@@ -62,7 +62,7 @@ public class MessageFilter extends EIPEn
      * a fault or error is reported, and the exchange will be kept in the
      * store for recovery. 
      */
-    private boolean reportErrors;
+    private boolean reportErrors = true;
     
     /**
      * @return Returns the target.
@@ -167,33 +167,34 @@ public class MessageFilter extends EIPEn
      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
      */
     protected void processAsync(MessageExchange exchange) throws Exception {
-        // If we need to report errors, the behavior is really different,
-        // as we need to keep the incoming exchange in the store until
-        // all acks have been received
-        if (reportErrors) {
-            // TODO: implement this
-            throw new UnsupportedOperationException("Not implemented");
-        // We are in a simple fire-and-forget behaviour.
-        // This implementation is really efficient as we do not use
-        // the store at all.
+        if (exchange.getStatus() == ExchangeStatus.DONE) {
+            MessageExchange me = (MessageExchange) store.load(exchange.getExchangeId());
+            done(me);
+        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            MessageExchange me = (MessageExchange) store.load(exchange.getExchangeId());
+            if (reportErrors) {
+                fail(me, exchange.getError());
+            } else {
+                done(me);
+            }
+        } else if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
+            fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+        } else if (exchange.getFault() != null) {
+            MessageExchange me = (MessageExchange) store.load(exchange.getExchangeId());
+            if (reportErrors) {
+                MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
+                send(me);
+            }
+            done(exchange);
         } else {
-            if (exchange.getStatus() == ExchangeStatus.DONE) {
-                return;
-            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                return;
-            } else if (!(exchange instanceof InOnly)
-                       && !(exchange instanceof RobustInOnly)) {
-                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-            } else if (exchange.getFault() != null) {
-                done(exchange);
+            NormalizedMessage in = MessageUtil.copyIn(exchange);
+            MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
+            target.configureTarget(me, getContext());
+            MessageUtil.transferToIn(in, me);
+            if (filter.matches(me)) {
+                store.store(me.getExchangeId(), exchange);
+                send(me);
             } else {
-                NormalizedMessage in = MessageUtil.copyIn(exchange);
-                MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
-                target.configureTarget(me, getContext());
-                MessageUtil.transferToIn(in, me);
-                if (filter.matches(me)) {
-                    send(me);
-                }
                 done(exchange);
             }
         }