You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 15:32:29 UTC

svn commit: r710067 - in /servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src: main/java/org/apache/servicemix/eip/patterns/ main/java/org/apache/servicemix/eip/support/ test/java/org/apache/servicemix/eip/

Author: gnodet
Date: Mon Nov  3 06:32:28 2008
New Revision: 710067

URL: http://svn.apache.org/viewvc?rev=710067&view=rev
Log:
Backport SM-1549 and SM-1411

Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java Mon Nov  3 06:32:28 2008
@@ -252,6 +252,12 @@
                 } else {
                     root.appendChild(doc.importNode(elem, true));
                 }
+                if (isCopyProperties()) {
+                    copyProperties(messages[i], message);
+                }
+                if (isCopyAttachments()) {
+                    copyAttachments(messages[i], message);
+                }
             }
         }
         message.setContent(new DOMSource(doc));

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov  3 06:32:28 2008
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 
 import javax.jbi.messaging.ExchangeStatus;
@@ -76,6 +77,8 @@
 
     private boolean reportClosedAggregatesAsErrors;
     
+    private boolean reportTimeoutAsErrors;
+    
     private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
     
     /**
@@ -170,6 +173,26 @@
     public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
         this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
     }
+    
+    /**
+     * Sets whether the aggregator should reports errors on incoming exchanges already received when
+     * a timeout occurs.
+     * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+     * a DONE status.
+     *  
+     * @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
+     *        timeout should be sent back with an ERROR status
+     */
+    public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
+        this.reportTimeoutAsErrors = reportTimeoutAsErrors;
+    }
+
+    /**
+     * @return the reportTimeoutAsErrors
+     */
+    public boolean isReportTimeoutAsErrors() {
+        return reportTimeoutAsErrors;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -210,6 +233,10 @@
             closedAggregatesStoreFactory = new MemoryStoreFactory();
         }
         closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+        if (reportTimeoutAsErrors && !reportErrors) {
+            throw new IllegalArgumentException(
+                "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+        }
     }
 
     /* (non-Javadoc)
@@ -352,7 +379,20 @@
             timers.remove(correlationId);
             Object aggregation = store.load(correlationId);
             if (aggregation != null) {
-                sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+                if (reportTimeoutAsErrors) {
+                    List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+                    if (exchanges != null) {
+                        TimeoutException timeoutException = new TimeoutException();
+                        for (MessageExchange me : exchanges) {
+                            me.setError(timeoutException);
+                            me.setStatus(ExchangeStatus.ERROR);
+                            send(me);
+                        }
+                    }
+                    closeAggregation(correlationId);
+                } else {
+                    sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
+                }
             } else if (!isAggregationClosed(correlationId)) {
                 throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
             } else {

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=710067&r1=710066&r2=710067&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java Mon Nov  3 06:32:28 2008
@@ -16,12 +16,20 @@
  */
 package org.apache.servicemix.eip;
 
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.eip.patterns.SplitAggregator;
 import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.tck.ReceiverComponent;
 
 public class SplitAggregatorTest extends AbstractEIPTest {
@@ -33,14 +41,16 @@
 
         aggregator = new SplitAggregator();
         aggregator.setTarget(createServiceExchangeTarget(new QName("target")));
+        aggregator.setCopyProperties(true);
         configurePattern(aggregator);
         activateComponent(aggregator, "aggregator");
     }
     
-    protected NormalizedMessage testRun(boolean[] msgs) throws Exception {
+    protected NormalizedMessage testRun(boolean[] msgs, boolean reportTimeoutAsErrors) throws Exception {
         ReceiverComponent rec = activateReceiver("target");
         
         int nbMessages = 3;
+        int nbMessagesSent = 0;
         String corrId = Long.toString(System.currentTimeMillis());
         for (int i = 0; i < 3; i++) {
             if (msgs == null || msgs[i]) {
@@ -50,27 +60,109 @@
                 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(nbMessages));
                 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(i));
                 me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+                me.getInMessage().setProperty("prop", "value");
                 client.send(me);
+                nbMessagesSent++;
             }
         }        
         
-        rec.getMessageList().assertMessagesReceived(1);
-        return (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+        NormalizedMessage msg = null;
+        if (reportTimeoutAsErrors && (nbMessages != nbMessagesSent)) {
+            for (int i = 0; i < nbMessagesSent; i++) {
+                MessageExchange me = (InOnly)client.receive();
+                assertEquals(ExchangeStatus.ERROR, me.getStatus());
+            }
+        } else {
+            rec.getMessageList().assertMessagesReceived(1);
+            msg = (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+            int nbElements = new SourceTransformer().toDOMElement(msg).getChildNodes().getLength();
+            assertEquals(nbMessagesSent, nbElements);
+            assertEquals("value", msg.getProperty("prop"));
+        }
+        return msg;
+    }
+
+    public void testWithoutReportErrorsAndWithReportTimeoutAsErrors() {
+        SplitAggregator aggr = new SplitAggregator();
+        aggr.setTarget(createServiceExchangeTarget(new QName("target")));
+        aggr.setReportErrors(false);
+        aggr.setReportTimeoutAsErrors(true);
+        configurePattern(aggr);
+        try {
+            activateComponent(aggr, "aggr");
+            fail("An IllegalArgumentException should have been thrown!");
+        } catch (Exception e) {
+            assertTrue(e instanceof JBIException);
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+        }
     }
     
     public void testSimple() throws Exception {
-        aggregator.setTimeout(500);
-        testRun(null);
+        aggregator.setTimeout(5000);
+        testRun(null, false);
     }
     
     public void testSimpleWithQNames() throws Exception {
         aggregator.setAggregateElementName(new QName("uri:test", "agg", "sm"));
         aggregator.setMessageElementName(new QName("uri:test", "msg", "sm"));
-        testRun(null);
+        testRun(null, false);
     }
     
     public void testWithTimeout() throws Exception {
         aggregator.setTimeout(500);
-        testRun(new boolean[] {true, false, true });
+        testRun(new boolean[] {true, false, true }, false);
+    }
+    
+    public void testWithTimeoutReportedAsErrors() throws Exception {
+        aggregator.setTimeout(500);
+        aggregator.setReportErrors(true);
+        aggregator.setReportTimeoutAsErrors(true);
+        testRun(new boolean[] {true, false, true }, true);
+    }
+    
+    public void testProcessCorrelationIdPropagationWithTimeout() throws Exception {
+        aggregator.setTimeout(500);
+        aggregator.setReportTimeoutAsErrors(false);
+
+        final AtomicReference<String> receivedCorrId = new AtomicReference<String>();
+
+        final String processCorrId = Long.toString(System.currentTimeMillis());
+        ReceiverComponent rec = new ReceiverComponent() {
+            @Override
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                String corrId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
+                receivedCorrId.set(corrId);
+                super.onMessageExchange(exchange);
+            }
+        };
+        activateComponent(rec, "target");
+
+        String corrId = Long.toString(System.currentTimeMillis());
+        InOnly me = client.createInOnlyExchange();
+        me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+        me.setService(new QName("aggregator"));
+        me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(0));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+        client.send(me);
+
+        rec.getMessageList().waitForMessagesToArrive(1);
+        rec.getMessageList().flushMessages();
+        assertEquals(processCorrId, receivedCorrId.get());
+
+        me = client.createInOnlyExchange();
+        me.setProperty(JbiConstants.CORRELATION_ID, processCorrId);
+        me.setService(new QName("aggregator"));
+        me.getInMessage().setContent(createSource("<hello id='" + 0 + "' />"));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(2));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(1));
+        me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+        client.sendSync(me);
+
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+        Thread.sleep(500);
+        rec.getMessageList().assertMessagesReceived(0);
     }
 }