You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 15:00:39 UTC
svn commit: r710057 - in
/servicemix/components/engines/servicemix-eip/trunk/src:
main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Author: gnodet
Date: Mon Nov 3 06:00:39 2008
New Revision: 710057
URL: http://svn.apache.org/viewvc?rev=710057&view=rev
Log:
SM-1549: servicemix-eip AbstractAggregator should support boolean property 'reportTimeoutsAsErrors'
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710057&r1=710056&r2=710057&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov 3 06:00:39 2008
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import javax.jbi.messaging.ExchangeStatus;
@@ -77,6 +78,8 @@
private boolean reportClosedAggregatesAsErrors = false;
+ private boolean reportTimeoutAsErrors;
+
private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
/**
@@ -171,6 +174,26 @@
public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
}
+
+ /**
+ * Sets whether the aggregator should reports errors on incoming exchanges already received when
+ * a timeout occurs.
+ * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+ * a DONE status.
+ *
+ * @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
+ * timeout should be sent back with an ERROR status
+ */
+ public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
+ this.reportTimeoutAsErrors = reportTimeoutAsErrors;
+ }
+
+ /**
+ * @return the reportTimeoutAsErrors
+ */
+ public boolean isReportTimeoutAsErrors() {
+ return reportTimeoutAsErrors;
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -211,6 +234,10 @@
closedAggregatesStoreFactory = new MemoryStoreFactory();
}
closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+ if (reportTimeoutAsErrors && !reportErrors) {
+ throw new IllegalArgumentException(
+ "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+ }
}
/* (non-Javadoc)
@@ -353,7 +380,20 @@
timers.remove(correlationId);
Object aggregation = store.load(correlationId);
if (aggregation != null) {
- sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+ if (reportTimeoutAsErrors) {
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+ if (exchanges != null) {
+ TimeoutException timeoutException = new TimeoutException();
+ for (MessageExchange me : exchanges) {
+ me.setError(timeoutException);
+ me.setStatus(ExchangeStatus.ERROR);
+ send(me);
+ }
+ }
+ closeAggregation(correlationId);
+ } else {
+ sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+ }
} else if (!isAggregationClosed(correlationId)) {
throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
} else {
Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=710057&r1=710056&r2=710057&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Mon Nov 3 06:00:39 2008
@@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicReference;
+import javax.jbi.JBIException;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
@@ -28,6 +29,7 @@
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.patterns.SplitAggregator;
import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.tck.ReceiverComponent;
public class SplitAggregatorTest extends AbstractEIPTest {
@@ -44,10 +46,11 @@
activateComponent(aggregator, "aggregator");
}
- protected NormalizedMessage testRun(boolean[] msgs) throws Exception {
+ protected NormalizedMessage testRun(boolean[] msgs, boolean reportTimeoutAsErrors) throws Exception {
ReceiverComponent rec = activateReceiver("target");
int nbMessages = 3;
+ int nbMessagesSent = 0;
String corrId = Long.toString(System.currentTimeMillis());
for (int i = 0; i < 3; i++) {
if (msgs == null || msgs[i]) {
@@ -59,33 +62,67 @@
me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
me.getInMessage().setProperty("prop", "value");
client.send(me);
+ nbMessagesSent++;
}
}
- rec.getMessageList().assertMessagesReceived(1);
- NormalizedMessage msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
- assertEquals("value", msg.getProperty("prop"));
+ NormalizedMessage msg = null;
+ if (reportTimeoutAsErrors && (nbMessages != nbMessagesSent)) {
+ for (int i = 0; i < nbMessagesSent; i++) {
+ MessageExchange me = (InOnly)client.receive();
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ }
+ } else {
+ rec.getMessageList().assertMessagesReceived(1);
+ msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+ int nbElements = new SourceTransformer().toDOMElement(msg).getChildNodes().getLength();
+ assertEquals(nbMessagesSent, nbElements);
+ assertEquals("value", msg.getProperty("prop"));
+ }
return msg;
}
+
+ public void testWithoutReportErrorsAndWithReportTimeoutAsErrors() {
+ SplitAggregator aggr = new SplitAggregator();
+ aggr.setTarget(createServiceExchangeTarget(new QName("target")));
+ aggr.setReportErrors(false);
+ aggr.setReportTimeoutAsErrors(true);
+ configurePattern(aggr);
+ try {
+ activateComponent(aggr, "aggr");
+ fail("An IllegalArgumentException should have been thrown!");
+ } catch (Exception e) {
+ assertTrue(e instanceof JBIException);
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
public void testSimple() throws Exception {
- aggregator.setTimeout(500);
- testRun(null);
+ aggregator.setTimeout(5000);
+ testRun(null, false);
}
public void testSimpleWithQNames() throws Exception {
aggregator.setAggregateElementName(new QName("uri:test", "agg", "sm"));
aggregator.setMessageElementName(new QName("uri:test", "msg", "sm"));
- testRun(null);
+ testRun(null, false);
}
public void testWithTimeout() throws Exception {
aggregator.setTimeout(500);
- testRun(new boolean[] {true, false, true });
+ testRun(new boolean[] {true, false, true }, false);
+ }
+
+ public void testWithTimeoutReportedAsErrors() throws Exception {
+ aggregator.setTimeout(500);
+ aggregator.setReportErrors(true);
+ aggregator.setReportTimeoutAsErrors(true);
+ testRun(new boolean[] {true, false, true }, true);
}
public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
aggregator.setTimeout(500);
+ aggregator.setReportTimeoutAsErrors(false);
final AtomicReference<String> receivedCorrId = new AtomicReference<String>();