You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/05 14:35:42 UTC

svn commit: r692422 - in /servicemix/components/engines/servicemix-eip/trunk/src: main/java/org/apache/servicemix/eip/support/AbstractAggregator.java test/java/org/apache/servicemix/eip/SplitAggregatorTest.java

Author: gnodet
Date: Fri Sep  5 05:35:41 2008
New Revision: 692422

URL: http://svn.apache.org/viewvc?rev=692422&view=rev
Log:
SM-1548: SMX-EIP AbstractAggregator timeout exchange has bad correlationId

Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=692422&r1=692421&r2=692422&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Fri Sep  5 05:35:41 2008
@@ -129,7 +129,9 @@
 
     public void setCopyAttachments(boolean copyAttachments) {
         this.copyAttachments = copyAttachments;
-    }/* (non-Javadoc)
+    }
+    
+    /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
     protected void processSync(MessageExchange exchange) throws Exception {
@@ -195,6 +197,8 @@
     }
 
     private void processProvider(MessageExchange exchange) throws Exception {
+        final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+
         NormalizedMessage in = MessageUtil.copyIn(exchange);
         final String correlationId = getCorrelationID(exchange, in);
         if (correlationId == null || correlationId.length() == 0) {
@@ -220,7 +224,7 @@
             // If the aggregation is not closed
             if (aggregation != null) {
                 if (addMessage(aggregation, in, exchange)) {
-                    sendAggregate(correlationId, aggregation, false, isSynchronous(exchange));
+                    sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
                 } else {
                     store.store(correlationId, aggregation);
                     if (timeout != null) {
@@ -229,7 +233,7 @@
                         }
                         Timer t = getTimerManager().schedule(new TimerListener() {
                             public void timerExpired(Timer timer) {
-                                AbstractAggregator.this.onTimeout(correlationId, timer);
+                                AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
                             }
                         }, timeout);
                         timers.put(correlationId, t);
@@ -242,11 +246,15 @@
         }
     }
 
-    protected void sendAggregate(String correlationId,
+    protected void sendAggregate(String processCorrelationId,
+                                 String correlationId,
                                  Object aggregation,
                                  boolean timeout,
                                  boolean sync) throws Exception {
         InOnly me = getExchangeFactory().createInOnlyExchange();
+        if (processCorrelationId != null) {
+            me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
+        }
         target.configureTarget(me, getContext());
         NormalizedMessage nm = me.createMessage();
         me.setInMessage(nm);
@@ -259,7 +267,7 @@
         }
     }
 
-    protected void onTimeout(String correlationId, Timer timer) {
+    protected void onTimeout(String processCorrelationId, String correlationId, Timer timer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Timeout expired for aggregate " + correlationId);
         }
@@ -269,12 +277,12 @@
             // the timeout event could have been fired before timer was canceled
             Timer t = timers.get(correlationId);
             if (t == null || !t.equals(timer)) {
-		return;
+                return;
             }
             timers.remove(correlationId);
             Object aggregation = store.load(correlationId);
             if (aggregation != null) {
-                sendAggregate(correlationId, aggregation, true, isSynchronous());
+                sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
             } else if (!isAggregationClosed(correlationId)) {
                 throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
             } else {

Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=692422&r1=692421&r2=692422&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Fri Sep  5 05:35:41 2008
@@ -16,10 +16,15 @@
  */
 package org.apache.servicemix.eip;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.patterns.SplitAggregator;
 import org.apache.servicemix.eip.support.AbstractSplitter;
 import org.apache.servicemix.tck.ReceiverComponent;
@@ -77,4 +82,34 @@
         aggregator.setTimeout(500);
         testRun(new boolean[] {true, false, true });
     }
+    
+    public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
+        aggregator.setTimeout(500);
+
+        final AtomicReference<String> receivedCorrId = new AtomicReference<String>();
+
+        final String processCorrId = Long.toString(System.currentTimeMillis());
+        ReceiverComponent rec = new ReceiverComponent() {
+        	@Override
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                String corrId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+                receivedCorrId.set(corrId);
+                super.onMessageExchange(exchange);
+            }
+		};
+        activateComponent(rec, "target");
+
+        String corrId = Long.toString(System.currentTimeMillis());
+        InOnly me = client.createInOnlyExchange();
+        me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+        me.setService(new QName("aggregator"));
+        me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(0));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+        client.send(me);
+
+        rec.getMessageList().waitForMessagesToArrive(1);
+        assertEquals(processCorrId, receivedCorrId.get());
+    }
 }