You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2010/11/24 15:15:05 UTC
svn commit: r1038612 -
/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
Author: lhein
Date: Wed Nov 24 14:15:05 2010
New Revision: 1038612
URL: http://svn.apache.org/viewvc?rev=1038612&view=rev
Log:
fixed problem where splitter sets DONE state of the original exchange one part message too early (see SM-2013)
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=1038612&r1=1038611&r2=1038612&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java Wed Nov 24 14:15:05 2010
@@ -244,41 +244,44 @@ public abstract class AbstractSplitter e
if (exchange.getStatus() == ExchangeStatus.DONE) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
- if (acks + 1 >= count) {
+ acks++;
+ if (acks < count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ store.store(corrId + ".acks", Integer.valueOf(acks));
removeLock = false;
}
}
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
+ acks++;
if (reportErrors) {
MessageExchange me = (MessageExchange) store.load(corrId);
fail(me, exchange.getError());
- } else if (acks + 1 >= count) {
+ } else if (acks < count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ store.store(corrId + ".acks", Integer.valueOf(acks));
removeLock = false;
}
}
} else if (exchange.getFault() != null) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
+ acks++;
if (reportErrors) {
MessageExchange me = (MessageExchange) store.load(corrId);
MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
send(me);
done(exchange);
- } else if (acks + 1 >= count) {
+ } else if (acks < count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
- store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ store.store(corrId + ".acks", Integer.valueOf(acks));
removeLock = false;
}
} else {
@@ -312,7 +315,8 @@ public abstract class AbstractSplitter e
target.configureTarget(parts[i], getContext());
send(parts[i]);
}
- done(exchange);
+ // do not done the exchange on provider side!
+ //done(exchange);
}
}
}