You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 13:36:41 UTC

svn commit: r710044 - in /servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src: main/java/org/apache/servicemix/eip/patterns/ main/java/org/apache/servicemix/eip/support/ test/java/org/apache/servicemix/eip/

Author: gnodet
Date: Mon Nov  3 04:36:39 2008
New Revision: 710044

URL: http://svn.apache.org/viewvc?rev=710044&view=rev
Log:
SM-1567: smx-eip AbstractAggregator should support boolean property 'reportErrors'

Added:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=710044&r1=710043&r2=710044&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Nov  3 04:36:39 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.servicemix.eip.patterns;
 
+import java.util.concurrent.locks.Lock;
+
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
@@ -126,6 +128,11 @@
             if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
                 fail(exchange, me.getError());
                 return;
+            } else if (me.getFault() != null && reportErrors) {
+                MessageUtil.transferFaultToFault(me, exchange);
+                sendSync(exchange);
+                done(me);
+                return;
             }
         }
         done(exchange);
@@ -135,26 +142,63 @@
      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
      */
     protected void processAsync(MessageExchange exchange) throws Exception {
-        // If we need to report errors, the behavior is really different,
-        // as we need to keep the incoming exchange in the store until
-        // all acks have been received
-        if (reportErrors) {
-            // TODO: implement this
-            throw new UnsupportedOperationException("Not implemented");
-        // We are in a simple fire-and-forget behaviour.
-        // This implementation is really efficient as we do not use
-        // the store at all.
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+            String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
+            int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+            Lock lock = lockManager.getLock(corrId);
+            lock.lock();
+            try {
+                Integer acks = (Integer) store.load(corrId + ".acks");
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    // If the acks integer is not here anymore, the message response has been sent already
+                    if (acks != null) {
+                        if (acks + 1 >= count) {
+                            MessageExchange me = (MessageExchange) store.load(corrId);
+                            done(me);
+                        } else {
+                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                        }
+                    }
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                    // If the acks integer is not here anymore, the message response has been sent already
+                    if (acks != null) {
+                        if (reportErrors) {
+                            MessageExchange me = (MessageExchange) store.load(corrId);
+                            fail(me, exchange.getError());
+                        } else  if (acks + 1 >= count) {
+                            MessageExchange me = (MessageExchange) store.load(corrId);
+                            done(me);
+                        } else {
+                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                        }
+                    }
+                } else if (exchange.getFault() != null) {
+                    // If the acks integer is not here anymore, the message response has been sent already
+                    if (acks != null) {
+                        if (reportErrors) {
+                            MessageExchange me = (MessageExchange) store.load(corrId);
+                            MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
+                            send(me);
+                            done(exchange);
+                        } else  if (acks + 1 >= count) {
+                            MessageExchange me = (MessageExchange) store.load(corrId);
+                            done(me);
+                        } else {
+                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                        }
+                    } else {
+                        done(exchange);
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
         } else {
-            if (exchange.getStatus() == ExchangeStatus.DONE) {
-                return;
-            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                return;
-            } else if (!(exchange instanceof InOnly)
-                       && !(exchange instanceof RobustInOnly)) {
+            if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
                 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-            } else if (exchange.getFault() != null) {
-                done(exchange);
-            } else {
+            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                store.store(exchange.getExchangeId(), exchange);
+                store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
                 NormalizedMessage in = MessageUtil.copyIn(exchange);
                 for (int i = 0; i < recipients.length; i++) {
                     MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
@@ -165,7 +209,6 @@
                     MessageUtil.transferToIn(in, me);
                     send(me);
                 }
-                done(exchange);
             }
         }
     }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710044&r1=710043&r2=710044&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov  3 04:36:39 2008
@@ -16,7 +16,9 @@
  */
 package org.apache.servicemix.eip.support;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
@@ -69,6 +71,10 @@
     private boolean copyProperties = true;
 
     private boolean copyAttachments = true;
+
+    private boolean reportErrors;
+
+    private boolean reportClosedAggregatesAsErrors;
     
     private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
     
@@ -129,7 +135,42 @@
     public void setCopyAttachments(boolean copyAttachments) {
         this.copyAttachments = copyAttachments;
     }
-    
+
+    public boolean isReportErrors() {
+        return reportErrors;
+    }
+
+    /**
+     * Sets whether the aggregator should report errors happening when sending the
+     * aggregate on all exchanges that compose the aggregate.
+     * The default value is <code>false</code>, meaning that if any error occur, this
+     * error will be lost.
+     * Note that if this flag is set to <code>true</code>, all exchanges received as part of a given aggregate
+     * will be hold until the aggregate is sent and the DONE / ERROR status is received back.
+     *
+     * @param reportErrors <code>boolean</code> indicating if errors should be reported back to consumers
+     */
+    public void setReportErrors(boolean reportErrors) {
+        this.reportErrors = reportErrors;
+    }
+
+    public boolean isReportClosedAggregatesAsErrors() {
+        return reportClosedAggregatesAsErrors;
+    }
+
+    /**
+     * Sets whether the aggregator should report errors on incoming exchanges received after a given
+     * aggregate has been closed.
+     * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+     * with a DONE status and discarded with respect to the aggregation process.
+     *
+     * @param reportClosedAggregatesAsErrors <code>boolean</code> indicating if exchanges received for a
+     *           closed aggregates should be send back with an ERROR status
+     */
+    public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
+        this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
+    }
+
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
@@ -175,23 +216,36 @@
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
      */
     public void process(MessageExchange exchange) throws Exception {
-        // Skip DONE
-        if (exchange.getStatus() == ExchangeStatus.DONE) {
-            return;
-        // Skip ERROR
-        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-            return;
-        // Handle an ACTIVE exchange as a PROVIDER
-        } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+        // Handle an exchange as a PROVIDER
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+                // ignore DONE / ERROR status from consumers
+                return;
+            }
             if (!(exchange instanceof InOnly)
                 && !(exchange instanceof RobustInOnly)) {
                 fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
             } else {
                 processProvider(exchange);
             }
-        // Handle an ACTIVE exchange as a CONSUMER
-        } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            done(exchange);
+        // Handle an exchange as a CONSUMER
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                throw new IllegalStateException("Unexpected active consumer exchange received");
+            }
+            if (reportErrors) {
+                String corrId = (String) exchange.getProperty(getService().toString() + ":" + getEndpoint() + ":correlation");
+                List<MessageExchange> exchanges = (List<MessageExchange>) store.load(corrId + "-exchanges");
+                if (exchanges != null) {
+                    for (MessageExchange me : exchanges) {
+                        if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                            me.setError(exchange.getError());
+                        }
+                        me.setStatus(exchange.getStatus());
+                        send(me);
+                    }
+                }
+            }
         }
     }
 
@@ -222,6 +276,14 @@
             }
             // If the aggregation is not closed
             if (aggregation != null) {
+                if (reportErrors) {
+                    List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+                    if (exchanges == null) {
+                        exchanges = new ArrayList<MessageExchange>();
+                    }
+                    exchanges.add(exchange);
+                    store.store(correlationId + "-exchanges", exchanges);
+                }
                 if (addMessage(aggregation, in, exchange)) {
                     sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
                 } else {
@@ -238,8 +300,16 @@
                         timers.put(correlationId, t);
                     }
                 }
+                if (!reportErrors) {
+                    done(exchange);
+                }
+            } else {
+                if (reportClosedAggregatesAsErrors) {
+                    fail(exchange, new ClosedAggregateException());
+                } else {
+                    done(exchange);
+                }
             }
-            done(exchange);
         } finally {
             lock.unlock();
         }
@@ -254,6 +324,7 @@
         if (processCorrelationId != null) {
             me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
         }
+        me.setProperty(getService().toString() + ":" + getEndpoint() + ":correlation", correlationId);
         target.configureTarget(me, getContext());
         NormalizedMessage nm = me.createMessage();
         me.setInMessage(nm);
@@ -378,4 +449,10 @@
                                            NormalizedMessage message,
                                            MessageExchange exchange,
                                            boolean timeout) throws Exception;
+
+    /**
+     * Error used to report that the aggregate has already been closed
+     */
+    public static class ClosedAggregateException extends Exception {
+    }
 }

Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java?rev=710044&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java Mon Nov  3 04:36:39 2008
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.RecipientListAggregator;
+import org.apache.servicemix.eip.patterns.StaticRecipientList;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+
+public class RecipientListAggregatorTest extends AbstractEIPTest {
+
+    private StaticRecipientList recipientList;
+    private RecipientListAggregator aggregator;
+
+    public void testReportErrors() throws Exception {
+        recipientList = new StaticRecipientList();
+        recipientList.setReportErrors(true);
+        recipientList.setRecipients(new ExchangeTarget[] {
+                createServiceExchangeTarget(new QName("aggregator")),
+                createServiceExchangeTarget(new QName("aggregator")),
+        });
+        configurePattern(recipientList);
+        activateComponent(recipientList, "recipientList");
+
+        aggregator = new RecipientListAggregator();
+        aggregator.setReportErrors(true);
+        aggregator.setTarget(createServiceExchangeTarget(new QName("error")));
+        configurePattern(aggregator);
+        activateComponent(aggregator, "aggregator");
+
+        activateComponent(new ReturnErrorComponent(), "error");
+
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("recipientList"));
+        NormalizedMessage message = me.getMessage("in");
+        message.setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+        assertNotNull(me.getError());
+        assertEquals("Dummy error", me.getError().getMessage());
+
+        listener.assertExchangeCompleted();
+    }
+
+}