You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/05 17:33:40 UTC
svn commit: r692464 -
/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Author: gnodet
Date: Fri Sep 5 08:33:39 2008
New Revision: 692464
URL: http://svn.apache.org/viewvc?rev=692464&view=rev
Log:
SM-1548: SMX-EIP AbstractAggregator timeout exchange has bad correlationId
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=692464&r1=692463&r2=692464&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Fri Sep 5 08:33:39 2008
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.jbi.util.MessageUtil;
import org.apache.servicemix.store.Store;
@@ -57,17 +58,20 @@
private static final Log LOG = LogFactory.getLog(AbstractAggregator.class);
private ExchangeTarget target;
-
+
private boolean rescheduleTimeouts;
-
+
private boolean synchronous;
private Store closedAggregates;
private StoreFactory closedAggregatesStoreFactory;
- private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
-
+ private boolean copyProperties = true;
+ private boolean copyAttachments = true;
+
+ private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
+
/**
* @return the synchronous
*/
@@ -109,6 +113,29 @@
public void setTarget(ExchangeTarget target) {
this.target = target;
}
+
+ public boolean isCopyProperties() {
+ return copyProperties;
+ }
+
+ public void setCopyProperties(boolean copyProperties) {
+ this.copyProperties = copyProperties;
+ }
+
+ public boolean isCopyAttachments() {
+ return copyAttachments;
+ }
+
+ public void setCopyAttachments(boolean copyAttachments) {
+ this.copyAttachments = copyAttachments;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processSync(MessageExchange exchange) throws Exception {
+ throw new IllegalStateException();
+ }
/**
* Access the currently configured {@link StoreFactory} for storing closed aggregations
@@ -129,13 +156,6 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
- */
- protected void processSync(MessageExchange exchange) throws Exception {
- throw new IllegalStateException();
- }
-
- /* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
*/
protected void processAsync(MessageExchange exchange) throws Exception {
@@ -158,10 +178,10 @@
// Skip DONE
if (exchange.getStatus() == ExchangeStatus.DONE) {
return;
- // Skip ERROR
+ // Skip ERROR
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
return;
- // Handle an ACTIVE exchange as a PROVIDER
+ // Handle an ACTIVE exchange as a PROVIDER
} else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
if (!(exchange instanceof InOnly)
&& !(exchange instanceof RobustInOnly)) {
@@ -169,13 +189,15 @@
} else {
processProvider(exchange);
}
- // Handle an ACTIVE exchange as a CONSUMER
+ // Handle an ACTIVE exchange as a CONSUMER
} else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
done(exchange);
}
}
private void processProvider(MessageExchange exchange) throws Exception {
+ final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+
NormalizedMessage in = MessageUtil.copyIn(exchange);
final String correlationId = getCorrelationID(exchange, in);
if (correlationId == null || correlationId.length() == 0) {
@@ -196,20 +218,12 @@
timeout = getTimeout(aggregation);
}
} else if (isRescheduleTimeouts()) {
- Timer t = timers.remove(correlationId);
- if (t != null) {
- t.cancel();
- }
timeout = getTimeout(aggregation);
}
// If the aggregation is not closed
if (aggregation != null) {
if (addMessage(aggregation, in, exchange)) {
- Timer t = timers.remove(correlationId);
- if (t != null) {
- t.cancel();
- }
- sendAggregate(correlationId, aggregation, false);
+ sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
} else {
store.store(correlationId, aggregation);
if (timeout != null) {
@@ -218,7 +232,7 @@
}
Timer t = getTimerManager().schedule(new TimerListener() {
public void timerExpired(Timer timer) {
- AbstractAggregator.this.onTimeout(correlationId, timer);
+ AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
}
}, timeout);
timers.put(correlationId, t);
@@ -231,23 +245,28 @@
}
}
- protected void sendAggregate(String correlationId,
+ protected void sendAggregate(String processCorrelationId,
+ String correlationId,
Object aggregation,
- boolean timeout) throws Exception {
+ boolean timeout,
+ boolean sync) throws Exception {
InOnly me = getExchangeFactory().createInOnlyExchange();
+ if (processCorrelationId != null) {
+ me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
+ }
target.configureTarget(me, getContext());
NormalizedMessage nm = me.createMessage();
me.setInMessage(nm);
buildAggregate(aggregation, nm, me, timeout);
closeAggregation(correlationId);
- if (isSynchronous()) {
+ if (sync) {
sendSync(me);
} else {
send(me);
}
}
- protected void onTimeout(String correlationId, Timer timer) {
+ protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
if (LOG.isDebugEnabled()) {
LOG.debug("Timeout expired for aggregate " + correlationId);
}
@@ -255,14 +274,14 @@
lock.lock();
try {
// the timeout event could have been fired before timer was canceled
- Timer t = getTimer(correlationId);
+ Timer t = timers.get(correlationId);
if (t == null || !t.equals(timer)) {
return;
}
timers.remove(correlationId);
Object aggregation = store.load(correlationId);
if (aggregation != null) {
- sendAggregate(correlationId, aggregation, true);
+ sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
} else if (!isAggregationClosed(correlationId)) {
throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
} else {
@@ -303,15 +322,10 @@
// TODO: implement this using a persistent / cached behavior
closedAggregates.store(correlationId, Boolean.TRUE);
}
-
- /**
- * Get the time-out timer for an active aggregation
- *
- * @param correlationId
- * @return
- */
- protected Timer getTimer(String correlationId) {
- return timers.get(correlationId);
+
+ private boolean isSynchronous(MessageExchange exchange) {
+ return isSynchronous()
+ || (exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)));
}
/**