You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 15:00:39 UTC

svn commit: r710057 - in /servicemix/components/engines/servicemix-eip/trunk/src: main/java/org/apache/servicemix/eip/support/AbstractAggregator.java test/java/org/apache/servicemix/eip/SplitAggregatorTest.java

Author: gnodet
Date: Mon Nov  3 06:00:39 2008
New Revision: 710057

URL: http://svn.apache.org/viewvc?rev=710057&view=rev
Log:
SM-1549: servicemix-eip AbstractAggregator should support boolean property 'reportTimeoutsAsErrors'

Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710057&r1=710056&r2=710057&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov  3 06:00:39 2008
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 
 import javax.jbi.messaging.ExchangeStatus;
@@ -77,6 +78,8 @@
 
     private boolean reportClosedAggregatesAsErrors = false;
     
+    private boolean reportTimeoutAsErrors;
+    
     private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
     
     /**
@@ -171,6 +174,26 @@
     public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
         this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
     }
+    
+    /**
+     * Sets whether the aggregator should reports errors on incoming exchanges already received when
+     * a timeout occurs.
+     * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+     * a DONE status.
+     *  
+     * @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
+     * 			timeout should be sent back with an ERROR status
+     */
+    public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
+        this.reportTimeoutAsErrors = reportTimeoutAsErrors;
+    }
+
+    /**
+     * @return the reportTimeoutAsErrors
+     */
+    public boolean isReportTimeoutAsErrors() {
+        return reportTimeoutAsErrors;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -211,6 +234,10 @@
             closedAggregatesStoreFactory = new MemoryStoreFactory();
         }
         closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+        if (reportTimeoutAsErrors && !reportErrors) {
+        	throw new IllegalArgumentException(
+        			"ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+        }
     }
 
     /* (non-Javadoc)
@@ -353,7 +380,20 @@
             timers.remove(correlationId);
             Object aggregation = store.load(correlationId);
             if (aggregation != null) {
-                sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+                if (reportTimeoutAsErrors) {
+                    List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+                    if (exchanges != null) {
+                        TimeoutException timeoutException = new TimeoutException();
+                        for (MessageExchange me : exchanges) {
+                            me.setError(timeoutException);
+                            me.setStatus(ExchangeStatus.ERROR);
+                            send(me);
+                        }
+                    }
+                    closeAggregation(correlationId);
+            	} else {
+                    sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+                }
             } else if (!isAggregationClosed(correlationId)) {
                 throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
             } else {

Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=710057&r1=710056&r2=710057&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Mon Nov  3 06:00:39 2008
@@ -18,6 +18,7 @@
 
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jbi.JBIException;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
@@ -28,6 +29,7 @@
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.patterns.SplitAggregator;
 import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.tck.ReceiverComponent;
 
 public class SplitAggregatorTest extends AbstractEIPTest {
@@ -44,10 +46,11 @@
         activateComponent(aggregator, "aggregator");
     }
     
-    protected NormalizedMessage testRun(boolean[] msgs) throws Exception {
+    protected NormalizedMessage testRun(boolean[] msgs, boolean reportTimeoutAsErrors) throws Exception {
         ReceiverComponent rec = activateReceiver("target");
         
         int nbMessages = 3;
+        int nbMessagesSent = 0;
         String corrId = Long.toString(System.currentTimeMillis());
         for (int i = 0; i < 3; i++) {
             if (msgs == null || msgs[i]) {
@@ -59,33 +62,67 @@
                 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
                 me.getInMessage().setProperty("prop", "value");
                 client.send(me);
+                nbMessagesSent++;
             }
         }        
         
-        rec.getMessageList().assertMessagesReceived(1);
-        NormalizedMessage msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
-        assertEquals("value", msg.getProperty("prop"));
+        NormalizedMessage msg = null;
+        if (reportTimeoutAsErrors && (nbMessages != nbMessagesSent)) {
+            for (int i = 0; i < nbMessagesSent; i++) {
+                MessageExchange me = (InOnly)client.receive();
+                assertEquals(ExchangeStatus.ERROR, me.getStatus());
+            }
+        } else {
+            rec.getMessageList().assertMessagesReceived(1);
+            msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+            int nbElements = new SourceTransformer().toDOMElement(msg).getChildNodes().getLength();
+            assertEquals(nbMessagesSent, nbElements);
+            assertEquals("value", msg.getProperty("prop"));
+        }
         return msg;
     }
+
+    public void testWithoutReportErrorsAndWithReportTimeoutAsErrors() {
+        SplitAggregator aggr = new SplitAggregator();
+        aggr.setTarget(createServiceExchangeTarget(new QName("target")));
+        aggr.setReportErrors(false);
+        aggr.setReportTimeoutAsErrors(true);
+        configurePattern(aggr);
+        try {
+        	activateComponent(aggr, "aggr");
+			fail("An IllegalArgumentException should have been thrown!");
+		} catch (Exception e) {
+			assertTrue(e instanceof JBIException);
+			assertTrue(e.getCause() instanceof IllegalArgumentException);
+		}
+    }
     
     public void testSimple() throws Exception {
-        aggregator.setTimeout(500);
-        testRun(null);
+    	aggregator.setTimeout(5000);
+        testRun(null, false);
     }
     
     public void testSimpleWithQNames() throws Exception {
         aggregator.setAggregateElementName(new QName("uri:test", "agg", "sm"));
         aggregator.setMessageElementName(new QName("uri:test", "msg", "sm"));
-        testRun(null);
+        testRun(null, false);
     }
     
     public void testWithTimeout() throws Exception {
         aggregator.setTimeout(500);
-        testRun(new boolean[] {true, false, true });
+        testRun(new boolean[] {true, false, true }, false);
+    }
+    
+    public void testWithTimeoutReportedAsErrors() throws Exception {
+        aggregator.setTimeout(500);
+        aggregator.setReportErrors(true);
+        aggregator.setReportTimeoutAsErrors(true);
+        testRun(new boolean[] {true, false, true }, true);
     }
     
     public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
         aggregator.setTimeout(500);
+        aggregator.setReportTimeoutAsErrors(false);
 
         final AtomicReference<String> receivedCorrId = new AtomicReference<String>();