You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/05 14:35:42 UTC
svn commit: r692422 - in
/servicemix/components/engines/servicemix-eip/trunk/src:
main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Author: gnodet
Date: Fri Sep 5 05:35:41 2008
New Revision: 692422
URL: http://svn.apache.org/viewvc?rev=692422&view=rev
Log:
SM-1548: SMX-EIP AbstractAggregator timeout exchange has bad correlationId
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=692422&r1=692421&r2=692422&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Fri Sep 5 05:35:41 2008
@@ -129,7 +129,9 @@
public void setCopyAttachments(boolean copyAttachments) {
this.copyAttachments = copyAttachments;
- }/* (non-Javadoc)
+ }
+
+ /* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
protected void processSync(MessageExchange exchange) throws Exception {
@@ -195,6 +197,8 @@
}
private void processProvider(MessageExchange exchange) throws Exception {
+ final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+
NormalizedMessage in = MessageUtil.copyIn(exchange);
final String correlationId = getCorrelationID(exchange, in);
if (correlationId == null || correlationId.length() == 0) {
@@ -220,7 +224,7 @@
// If the aggregation is not closed
if (aggregation != null) {
if (addMessage(aggregation, in, exchange)) {
- sendAggregate(correlationId, aggregation, false, isSynchronous(exchange));
+ sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
} else {
store.store(correlationId, aggregation);
if (timeout != null) {
@@ -229,7 +233,7 @@
}
Timer t = getTimerManager().schedule(new TimerListener() {
public void timerExpired(Timer timer) {
- AbstractAggregator.this.onTimeout(correlationId, timer);
+ AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
}
}, timeout);
timers.put(correlationId, t);
@@ -242,11 +246,15 @@
}
}
- protected void sendAggregate(String correlationId,
+ protected void sendAggregate(String processCorrelationId,
+ String correlationId,
Object aggregation,
boolean timeout,
boolean sync) throws Exception {
InOnly me = getExchangeFactory().createInOnlyExchange();
+ if (processCorrelationId != null) {
+ me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
+ }
target.configureTarget(me, getContext());
NormalizedMessage nm = me.createMessage();
me.setInMessage(nm);
@@ -259,7 +267,7 @@
}
}
- protected void onTimeout(String correlationId, Timer timer) {
+ protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
if (LOG.isDebugEnabled()) {
LOG.debug("Timeout expired for aggregate " + correlationId);
}
@@ -269,12 +277,12 @@
// the timeout event could have been fired before timer was canceled
Timer t = timers.get(correlationId);
if (t == null || !t.equals(timer)) {
- return;
+ return;
}
timers.remove(correlationId);
Object aggregation = store.load(correlationId);
if (aggregation != null) {
- sendAggregate(correlationId, aggregation, true, isSynchronous());
+ sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
} else if (!isAggregationClosed(correlationId)) {
throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
} else {
Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=692422&r1=692421&r2=692422&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Fri Sep 5 05:35:41 2008
@@ -16,10 +16,15 @@
*/
package org.apache.servicemix.eip;
+import java.util.concurrent.atomic.AtomicReference;
+
import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.patterns.SplitAggregator;
import org.apache.servicemix.eip.support.AbstractSplitter;
import org.apache.servicemix.tck.ReceiverComponent;
@@ -77,4 +82,34 @@
aggregator.setTimeout(500);
testRun(new boolean[] {true, false, true });
}
+
+ public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
+ aggregator.setTimeout(500);
+
+ final AtomicReference<String> receivedCorrId = new AtomicReference<String>();
+
+ final String processCorrId = Long.toString(System.currentTimeMillis());
+ ReceiverComponent rec = new ReceiverComponent() {
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ String corrId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+ receivedCorrId.set(corrId);
+ super.onMessageExchange(exchange);
+ }
+ };
+ activateComponent(rec, "target");
+
+ String corrId = Long.toString(System.currentTimeMillis());
+ InOnly me = client.createInOnlyExchange();
+ me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+ me.setService(new QName("aggregator"));
+ me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(0));
+ me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+ client.send(me);
+
+ rec.getMessageList().waitForMessagesToArrive(1);
+ assertEquals(processCorrId, receivedCorrId.get());
+ }
}