You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/05 17:33:40 UTC

svn commit: r692464 - /servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Author: gnodet
Date: Fri Sep  5 08:33:39 2008
New Revision: 692464

URL: http://svn.apache.org/viewvc?rev=692464&view=rev
Log:
SM-1548: SMX-EIP AbstractAggregator timeout exchange has bad correlationId

Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=692464&r1=692463&r2=692464&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Fri Sep  5 08:33:39 2008
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.jbi.util.MessageUtil;
 import org.apache.servicemix.store.Store;
@@ -57,17 +58,20 @@
     private static final Log LOG = LogFactory.getLog(AbstractAggregator.class);
 
     private ExchangeTarget target;
-
+    
     private boolean rescheduleTimeouts;
-
+    
     private boolean synchronous;
 
     private Store closedAggregates;
     private StoreFactory closedAggregatesStoreFactory;
 
-    private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
-
+    private boolean copyProperties = true;
 
+    private boolean copyAttachments = true;
+    
+    private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
+    
     /**
      * @return the synchronous
      */
@@ -109,6 +113,29 @@
     public void setTarget(ExchangeTarget target) {
         this.target = target;
     }
+
+    public boolean isCopyProperties() {
+        return copyProperties;
+    }
+
+    public void setCopyProperties(boolean copyProperties) {
+        this.copyProperties = copyProperties;
+    }
+
+    public boolean isCopyAttachments() {
+        return copyAttachments;
+    }
+
+    public void setCopyAttachments(boolean copyAttachments) {
+        this.copyAttachments = copyAttachments;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processSync(MessageExchange exchange) throws Exception {
+        throw new IllegalStateException();
+    }
     
     /**
      * Access the currently configured {@link StoreFactory} for storing closed aggregations
@@ -129,13 +156,6 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
-     */
-    protected void processSync(MessageExchange exchange) throws Exception {
-        throw new IllegalStateException();
-    }
-
-    /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
      */
     protected void processAsync(MessageExchange exchange) throws Exception {
@@ -158,10 +178,10 @@
         // Skip DONE
         if (exchange.getStatus() == ExchangeStatus.DONE) {
             return;
-            // Skip ERROR
+        // Skip ERROR
         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
             return;
-            // Handle an ACTIVE exchange as a PROVIDER
+        // Handle an ACTIVE exchange as a PROVIDER
         } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
             if (!(exchange instanceof InOnly)
                 && !(exchange instanceof RobustInOnly)) {
@@ -169,13 +189,15 @@
             } else {
                 processProvider(exchange);
             }
-            // Handle an ACTIVE exchange as a CONSUMER
+        // Handle an ACTIVE exchange as a CONSUMER
         } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
             done(exchange);
         }
     }
 
     private void processProvider(MessageExchange exchange) throws Exception {
+        final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+
         NormalizedMessage in = MessageUtil.copyIn(exchange);
         final String correlationId = getCorrelationID(exchange, in);
         if (correlationId == null || correlationId.length() == 0) {
@@ -196,20 +218,12 @@
                     timeout = getTimeout(aggregation);
                 }
             } else if (isRescheduleTimeouts()) {
-                Timer t = timers.remove(correlationId);
-                if (t != null) {
-                    t.cancel();
-                }
                 timeout = getTimeout(aggregation);
             }
             // If the aggregation is not closed
             if (aggregation != null) {
                 if (addMessage(aggregation, in, exchange)) {
-                    Timer t = timers.remove(correlationId);
-                    if (t != null) {
-                        t.cancel();
-                    }
-                    sendAggregate(correlationId, aggregation, false);
+                    sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
                 } else {
                     store.store(correlationId, aggregation);
                     if (timeout != null) {
@@ -218,7 +232,7 @@
                         }
                         Timer t = getTimerManager().schedule(new TimerListener() {
                             public void timerExpired(Timer timer) {
-                                AbstractAggregator.this.onTimeout(correlationId, timer);
+                                AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
                             }
                         }, timeout);
                         timers.put(correlationId, t);
@@ -231,23 +245,28 @@
         }
     }
 
-    protected void sendAggregate(String correlationId,
+    protected void sendAggregate(String processCorrelationId,
+                                 String correlationId,
                                  Object aggregation,
-                                 boolean timeout) throws Exception {
+                                 boolean timeout,
+                                 boolean sync) throws Exception {
         InOnly me = getExchangeFactory().createInOnlyExchange();
+        if (processCorrelationId != null) {
+            me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
+        }
         target.configureTarget(me, getContext());
         NormalizedMessage nm = me.createMessage();
         me.setInMessage(nm);
         buildAggregate(aggregation, nm, me, timeout);
         closeAggregation(correlationId);
-        if (isSynchronous()) {
+        if (sync) {
             sendSync(me);
         } else {
             send(me);
         }
     }
 
-    protected void onTimeout(String correlationId, Timer timer) {
+    protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Timeout expired for aggregate " + correlationId);
         }
@@ -255,14 +274,14 @@
         lock.lock();
         try {
             // the timeout event could have been fired before timer was canceled
-            Timer t = getTimer(correlationId);
+            Timer t = timers.get(correlationId);
             if (t == null || !t.equals(timer)) {
                 return;
             }
             timers.remove(correlationId);
             Object aggregation = store.load(correlationId);
             if (aggregation != null) {
-                sendAggregate(correlationId, aggregation, true);
+                sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
             } else if (!isAggregationClosed(correlationId)) {
                 throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
             } else {
@@ -303,15 +322,10 @@
         // TODO: implement this using a persistent / cached behavior
         closedAggregates.store(correlationId, Boolean.TRUE);
     }
-    
-    /**
-     * Get the time-out timer for an active aggregation
-     * 
-     * @param correlationId
-     * @return 
-     */
-    protected Timer getTimer(String correlationId) {
-        return timers.get(correlationId); 
+
+    private boolean isSynchronous(MessageExchange exchange) {
+        return isSynchronous()
+                || (exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)));
     }
 
     /**