You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2010/11/24 15:15:05 UTC

svn commit: r1038612 - /servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java

Author: lhein
Date: Wed Nov 24 14:15:05 2010
New Revision: 1038612

URL: http://svn.apache.org/viewvc?rev=1038612&view=rev
Log:
fixed problem where splitter sets DONE state of the original exchange one part message too early (see SM-2013)

Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=1038612&r1=1038611&r2=1038612&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java Wed Nov 24 14:15:05 2010
@@ -244,41 +244,44 @@ public abstract class AbstractSplitter e
                 if (exchange.getStatus() == ExchangeStatus.DONE) {
                     // If the acks integer is not here anymore, the message response has been sent already
                     if (acks != null) {
-                        if (acks + 1 >= count) {
+                        acks++;
+                        if (acks < count) {
                             MessageExchange me = (MessageExchange) store.load(corrId);
                             done(me);
                         } else {
-                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            store.store(corrId + ".acks", Integer.valueOf(acks));
                             removeLock = false;
                         }
                     }
                 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                     // If the acks integer is not here anymore, the message response has been sent already
                     if (acks != null) {
+                        acks++;
                         if (reportErrors) {
                             MessageExchange me = (MessageExchange) store.load(corrId);
                             fail(me, exchange.getError());
-                        } else  if (acks + 1 >= count) {
+                        } else  if (acks < count) {
                             MessageExchange me = (MessageExchange) store.load(corrId);
                             done(me);
                         } else {
-                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            store.store(corrId + ".acks", Integer.valueOf(acks));
                             removeLock = false;
                         }
                     }
                 } else if (exchange.getFault() != null) {
                     // If the acks integer is not here anymore, the message response has been sent already
                     if (acks != null) {
+                        acks++;
                         if (reportErrors) {
                             MessageExchange me = (MessageExchange) store.load(corrId);
                             MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
                             send(me);
                             done(exchange);
-                        } else  if (acks + 1 >= count) {
+                        } else  if (acks < count) {
                             MessageExchange me = (MessageExchange) store.load(corrId);
                             done(me);
                         } else {
-                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            store.store(corrId + ".acks", Integer.valueOf(acks));
                             removeLock = false;
                         }
                     } else {
@@ -312,7 +315,8 @@ public abstract class AbstractSplitter e
                     target.configureTarget(parts[i], getContext());
                     send(parts[i]);
                 }
-                done(exchange);
+                // do not done the exchange on provider side!
+                //done(exchange);
             }
         }
     }