You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/06/28 02:37:52 UTC
svn commit: r1140389 -
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
Author: charith
Date: Tue Jun 28 00:37:52 2011
New Revision: 1140389
URL: http://svn.apache.org/viewvc?rev=1140389&view=rev
Log:
Fixing a bug in scheduled message forwarding processir when configured with parameter max.deliver.attempts
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java?rev=1140389&r1=1140388&r2=1140389&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java Tue Jun 28 00:37:52 2011
@@ -65,6 +65,8 @@ public class ForwardingJob implements St
int maxDeliverAttempts = -1;
String mdaParam = (String) parameters.get(MessageProcessorConsents.MAX_DELIVER_ATTEMPTS);
+
+
if (mdaParam != null) {
maxDeliverAttempts = Integer.parseInt(mdaParam);
@@ -80,10 +82,6 @@ public class ForwardingJob implements St
return;
}
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- }
-
boolean errorStop = false;
while (!errorStop) {
@@ -134,6 +132,11 @@ public class ForwardingJob implements St
if (outCtx != null && "true".equals(outCtx.
getProperty(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR))) {
// This Means an Error has occurred
+
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ }
+
if (parameters != null &&
parameters.get(
ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
@@ -151,7 +154,7 @@ public class ForwardingJob implements St
}
if (maxDeliverAttempts > 0) {
- if(processor.getSendAttemptCount() > maxDeliverAttempts) {
+ if(processor.getSendAttemptCount() >= maxDeliverAttempts) {
processor.deactivate();
}
}
@@ -160,7 +163,10 @@ public class ForwardingJob implements St
} else if(outCtx == null) {
// This Means we have invoked an out only operation
+ // remove the message and reset the count
messageStore.poll();
+ processor.resetSentAttemptCount();
+ continue;
}
// If there is a sequence defined to send success replies,
@@ -182,10 +188,14 @@ public class ForwardingJob implements St
}
// If no Exception Occurred We remove the Message
+ // and reset the delivery attempt count
+ processor.resetSentAttemptCount();
messageStore.poll();
} catch (Exception e) {
+
if (maxDeliverAttempts > 0) {
- if (processor.getSendAttemptCount() > maxDeliverAttempts) {
+ processor.incrementSendAttemptCount();
+ if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
processor.deactivate();
}
}
@@ -214,11 +224,6 @@ public class ForwardingJob implements St
}
} else {
- if (maxDeliverAttempts > 0) {
- if (processor.getSendAttemptCount() > maxDeliverAttempts) {
- processor.deactivate();
- }
- }
errorStop = true;
}
}