You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/02 02:29:41 UTC

svn commit: r709826 - in /servicemix/components/engines/servicemix-eip/trunk/src: main/java/org/apache/servicemix/eip/support/AbstractAggregator.java test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java

Author: gnodet
Date: Sat Nov  1 18:29:41 2008
New Revision: 709826

URL: http://svn.apache.org/viewvc?rev=709826&view=rev
Log:
SM-1567: smx-eip AbstractAggregator should support boolean property 'reportErrors'

Added:
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=709826&r1=709825&r2=709826&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Sat Nov  1 18:29:41 2008
@@ -17,6 +17,8 @@
 package org.apache.servicemix.eip.support;
 
 import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
@@ -70,6 +72,10 @@
     private boolean copyProperties = true;
 
     private boolean copyAttachments = true;
+
+    private boolean reportErrors = false;
+
+    private boolean reportClosedAggregatesAsErrors = false;
     
     private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
     
@@ -130,7 +136,42 @@
     public void setCopyAttachments(boolean copyAttachments) {
         this.copyAttachments = copyAttachments;
     }
-    
+
+    public boolean isReportErrors() {
+        return reportErrors;
+    }
+
+    /**
+     * Sets wether the aggregator should report errors happening when sending the
+     * aggregate on all exchanges that compose the aggregate.
+     * The default value is <code>false</code>, meaning that if any error occur, this
+     * error will be lost.
+     * Note that if this flag is set to <code>true</code>, all exchanges received as part of a given aggregate
+     * will be hold until the aggregate is sent and the DONE / ERROR status is received back.
+     *
+     * @param reportErrors <code>boolean</code> indicating if errors should be reported back to consumers
+     */
+    public void setReportErrors(boolean reportErrors) {
+        this.reportErrors = reportErrors;
+    }
+
+    public boolean isReportClosedAggregatesAsErrors() {
+        return reportClosedAggregatesAsErrors;
+    }
+
+    /**
+     * Sets wether the aggregator should report errors on incoming exchanges received after a given
+     * aggregate has been closed.
+     * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+     * with a DONE status and discarded with respect to the aggregation process.
+     *
+     * @param reportClosedAggregatesAsErrors <code>boolean</code> indicating if exchanges received for a
+     *           closed aggregates should be send back with an ERROR status
+     */
+    public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
+        this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
+    }
+
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
@@ -176,23 +217,36 @@
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
      */
     public void process(MessageExchange exchange) throws Exception {
-        // Skip DONE
-        if (exchange.getStatus() == ExchangeStatus.DONE) {
-            return;
-        // Skip ERROR
-        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-            return;
-        // Handle an ACTIVE exchange as a PROVIDER
-        } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+        // Handle an exchange as a PROVIDER
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                // ignore DONE / ERROR status from consumers
+                return;
+            }
             if (!(exchange instanceof InOnly)
                 && !(exchange instanceof RobustInOnly)) {
                 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
             } else {
                 processProvider(exchange);
             }
-        // Handle an ACTIVE exchange as a CONSUMER
-        } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            done(exchange);
+        // Handle an exchange as a CONSUMER
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                throw new IllegalStateException("Unexpected active consumer exchange received");
+            }
+            if (reportErrors) {
+                String corrId = (String) exchange.getProperty(getService().toString() + ":" + getEndpoint() + ":correlation");
+                List<MessageExchange> exchanges = (List<MessageExchange>) store.load(corrId + "-exchanges");
+                if (exchanges != null) {
+                    for (MessageExchange me : exchanges) {
+                        if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                            exchange.setError(me.getError());
+                        }
+                        me.setStatus(exchange.getStatus());
+                        send(me);
+                    }
+                }
+            }
         }
     }
 
@@ -223,6 +277,14 @@
             }
             // If the aggregation is not closed
             if (aggregation != null) {
+                if (reportErrors) {
+                    List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+                    if (exchanges == null) {
+                        exchanges = new ArrayList<MessageExchange>();
+                    }
+                    exchanges.add(exchange);
+                    store.store(correlationId + "-exchanges", exchanges);
+                }
                 if (addMessage(aggregation, in, exchange)) {
                     sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
                 } else {
@@ -239,8 +301,16 @@
                         timers.put(correlationId, t);
                     }
                 }
+                if (!reportErrors) {
+                    done(exchange);
+                }
+            } else {
+                if (reportClosedAggregatesAsErrors) {
+                    fail(exchange, new ClosedAggregateException());
+                } else {
+                    done(exchange);
+                }
             }
-            done(exchange);
         } finally {
             lock.unlock();
         }
@@ -255,6 +325,7 @@
         if (processCorrelationId != null) {
             me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
         }
+        me.setProperty(getService().toString() + ":" + getEndpoint() + ":correlation", correlationId);
         target.configureTarget(me, getContext());
         NormalizedMessage nm = me.createMessage();
         me.setInMessage(nm);
@@ -379,4 +450,10 @@
                                            NormalizedMessage message,
                                            MessageExchange exchange,
                                            boolean timeout) throws Exception;
+
+    /**
+     * Error used to report that the aggregate has already been closed
+     */
+    public static class ClosedAggregateException extends Exception {
+    }
 }

Added: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java?rev=709826&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java (added)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java Sat Nov  1 18:29:41 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.xml.namespace.QName;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.eip.patterns.StaticRecipientList;
+import org.apache.servicemix.eip.patterns.RecipientListAggregator;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+
+public class RecipientListAggregatorTest extends AbstractEIPTest {
+
+    private StaticRecipientList recipientList;
+    private RecipientListAggregator aggregator;
+
+    public void testReportErrors() throws Exception {
+        recipientList = new StaticRecipientList();
+        recipientList.setReportErrors(true);
+        recipientList.setRecipients(new ExchangeTarget[] {
+                createServiceExchangeTarget(new QName("aggregator")),
+                createServiceExchangeTarget(new QName("aggregator")),
+        });
+        configurePattern(recipientList);
+        activateComponent(recipientList, "recipientList");
+
+        aggregator = new RecipientListAggregator();
+        aggregator.setReportErrors(true);
+        aggregator.setTarget(createServiceExchangeTarget(new QName("error")));
+        configurePattern(aggregator);
+        activateComponent(aggregator, "aggregator");
+
+        activateComponent(new ReturnErrorComponent(), "error");
+
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("recipientList"));
+        NormalizedMessage message = me.getMessage("in");
+        message.setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+
+        listener.assertExchangeCompleted();
+    }
+
+}