You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 15:32:29 UTC
svn commit: r710067 - in
/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src:
main/java/org/apache/servicemix/eip/patterns/
main/java/org/apache/servicemix/eip/support/
test/java/org/apache/servicemix/eip/
Author: gnodet
Date: Mon Nov 3 06:32:28 2008
New Revision: 710067
URL: http://svn.apache.org/viewvc?rev=710067&view=rev
Log:
Backport SM-1549 and SM-1411
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java Mon Nov 3 06:32:28 2008
@@ -252,6 +252,12 @@
} else {
root.appendChild(doc.importNode(elem, true));
}
+ if (isCopyProperties()) {
+ copyProperties(messages[i], message);
+ }
+ if (isCopyAttachments()) {
+ copyAttachments(messages[i], message);
+ }
}
}
message.setContent(new DOMSource(doc));
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov 3 06:32:28 2008
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import javax.jbi.messaging.ExchangeStatus;
@@ -76,6 +77,8 @@
private boolean reportClosedAggregatesAsErrors;
+ private boolean reportTimeoutAsErrors;
+
private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
/**
@@ -170,6 +173,26 @@
public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
}
+
+ /**
+ * Sets whether the aggregator should reports errors on incoming exchanges already received when
+ * a timeout occurs.
+ * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+ * a DONE status.
+ *
+ * @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
+ * timeout should be sent back with an ERROR status
+ */
+ public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
+ this.reportTimeoutAsErrors = reportTimeoutAsErrors;
+ }
+
+ /**
+ * @return the reportTimeoutAsErrors
+ */
+ public boolean isReportTimeoutAsErrors() {
+ return reportTimeoutAsErrors;
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -210,6 +233,10 @@
closedAggregatesStoreFactory = new MemoryStoreFactory();
}
closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+ if (reportTimeoutAsErrors && !reportErrors) {
+ throw new IllegalArgumentException(
+ "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+ }
}
/* (non-Javadoc)
@@ -352,7 +379,20 @@
timers.remove(correlationId);
Object aggregation = store.load(correlationId);
if (aggregation != null) {
- sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+ if (reportTimeoutAsErrors) {
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+ if (exchanges != null) {
+ TimeoutException timeoutException = new TimeoutException();
+ for (MessageExchange me : exchanges) {
+ me.setError(timeoutException);
+ me.setStatus(ExchangeStatus.ERROR);
+ send(me);
+ }
+ }
+ closeAggregation(correlationId);
+ } else {
+ sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+ }
} else if (!isAggregationClosed(correlationId)) {
throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
} else {
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Mon Nov 3 06:32:28 2008
@@ -16,12 +16,20 @@
*/
package org.apache.servicemix.eip;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.patterns.SplitAggregator;
import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.tck.ReceiverComponent;
public class SplitAggregatorTest extends AbstractEIPTest {
@@ -33,14 +41,16 @@
aggregator = new SplitAggregator();
aggregator.setTarget(createServiceExchangeTarget(new QName("target")));
+ aggregator.setCopyProperties(true);
configurePattern(aggregator);
activateComponent(aggregator, "aggregator");
}
- protected NormalizedMessage testRun(boolean[] msgs) throws Exception {
+ protected NormalizedMessage testRun(boolean[] msgs, boolean reportTimeoutAsErrors) throws Exception {
ReceiverComponent rec = activateReceiver("target");
int nbMessages = 3;
+ int nbMessagesSent = 0;
String corrId = Long.toString(System.currentTimeMillis());
for (int i = 0; i < 3; i++) {
if (msgs == null || msgs[i]) {
@@ -50,27 +60,109 @@
me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(nbMessages));
me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(i));
me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+ me.getInMessage().setProperty("prop", "value");
client.send(me);
+ nbMessagesSent++;
}
}
- rec.getMessageList().assertMessagesReceived(1);
- return (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+ NormalizedMessage msg = null;
+ if (reportTimeoutAsErrors && (nbMessages != nbMessagesSent)) {
+ for (int i = 0; i < nbMessagesSent; i++) {
+ MessageExchange me = (InOnly)client.receive();
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ }
+ } else {
+ rec.getMessageList().assertMessagesReceived(1);
+ msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+ int nbElements = new SourceTransformer().toDOMElement(msg).getChildNodes().getLength();
+ assertEquals(nbMessagesSent, nbElements);
+ assertEquals("value", msg.getProperty("prop"));
+ }
+ return msg;
+ }
+
+ public void testWithoutReportErrorsAndWithReportTimeoutAsErrors() {
+ SplitAggregator aggr = new SplitAggregator();
+ aggr.setTarget(createServiceExchangeTarget(new QName("target")));
+ aggr.setReportErrors(false);
+ aggr.setReportTimeoutAsErrors(true);
+ configurePattern(aggr);
+ try {
+ activateComponent(aggr, "aggr");
+ fail("An IllegalArgumentException should have been thrown!");
+ } catch (Exception e) {
+ assertTrue(e instanceof JBIException);
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
}
public void testSimple() throws Exception {
- aggregator.setTimeout(500);
- testRun(null);
+ aggregator.setTimeout(5000);
+ testRun(null, false);
}
public void testSimpleWithQNames() throws Exception {
aggregator.setAggregateElementName(new QName("uri:test", "agg", "sm"));
aggregator.setMessageElementName(new QName("uri:test", "msg", "sm"));
- testRun(null);
+ testRun(null, false);
}
public void testWithTimeout() throws Exception {
aggregator.setTimeout(500);
- testRun(new boolean[] {true, false, true });
+ testRun(new boolean[] {true, false, true }, false);
+ }
+
+ public void testWithTimeoutReportedAsErrors() throws Exception {
+ aggregator.setTimeout(500);
+ aggregator.setReportErrors(true);
+ aggregator.setReportTimeoutAsErrors(true);
+ testRun(new boolean[] {true, false, true }, true);
+ }
+
+ public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
+ aggregator.setTimeout(500);
+ aggregator.setReportTimeoutAsErrors(false);
+
+ final AtomicReference<String> receivedCorrId = new AtomicReference<String>();
+
+ final String processCorrId = Long.toString(System.currentTimeMillis());
+ ReceiverComponent rec = new ReceiverComponent() {
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ String corrId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+ receivedCorrId.set(corrId);
+ super.onMessageExchange(exchange);
+ }
+ };
+ activateComponent(rec, "target");
+
+ String corrId = Long.toString(System.currentTimeMillis());
+ InOnly me = client.createInOnlyExchange();
+ me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+ me.setService(new QName("aggregator"));
+ me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(0));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+ client.send(me);
+
+ rec.getMessageList().waitForMessagesToArrive(1);
+ rec.getMessageList().flushMessages();
+ assertEquals(processCorrId, receivedCorrId.get());
+
+ me = client.createInOnlyExchange();
+ me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+ me.setService(new QName("aggregator"));
+ me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(1));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+ client.sendSync(me);
+
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+ Thread.sleep(500);
+ rec.getMessageList().assertMessagesReceived(0);
}
}