You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/02 02:29:41 UTC
svn commit: r709826 - in
/servicemix/components/engines/servicemix-eip/trunk/src:
main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
Author: gnodet
Date: Sat Nov 1 18:29:41 2008
New Revision: 709826
URL: http://svn.apache.org/viewvc?rev=709826&view=rev
Log:
SM-1567: smx-eip AbstractAggregator should support boolean property 'reportErrors'
Added:
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=709826&r1=709825&r2=709826&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Sat Nov 1 18:29:41 2008
@@ -17,6 +17,8 @@
package org.apache.servicemix.eip.support;
import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@@ -70,6 +72,10 @@
private boolean copyProperties = true;
private boolean copyAttachments = true;
+
+ private boolean reportErrors = false;
+
+ private boolean reportClosedAggregatesAsErrors = false;
private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
@@ -130,7 +136,42 @@
public void setCopyAttachments(boolean copyAttachments) {
this.copyAttachments = copyAttachments;
}
-
+
+ public boolean isReportErrors() {
+ return reportErrors;
+ }
+
+ /**
+ * Sets wether the aggregator should report errors happening when sending the
+ * aggregate on all exchanges that compose the aggregate.
+ * The default value is <code>false</code>, meaning that if any error occur, this
+ * error will be lost.
+ * Note that if this flag is set to <code>true</code>, all exchanges received as part of a given aggregate
+ * will be hold until the aggregate is sent and the DONE / ERROR status is received back.
+ *
+ * @param reportErrors <code>boolean</code> indicating if errors should be reported back to consumers
+ */
+ public void setReportErrors(boolean reportErrors) {
+ this.reportErrors = reportErrors;
+ }
+
+ public boolean isReportClosedAggregatesAsErrors() {
+ return reportClosedAggregatesAsErrors;
+ }
+
+ /**
+ * Sets wether the aggregator should report errors on incoming exchanges received after a given
+ * aggregate has been closed.
+ * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+ * with a DONE status and discarded with respect to the aggregation process.
+ *
+ * @param reportClosedAggregatesAsErrors <code>boolean</code> indicating if exchanges received for a
+ * closed aggregates should be send back with an ERROR status
+ */
+ public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
+ this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
+ }
+
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
@@ -176,23 +217,36 @@
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
*/
public void process(MessageExchange exchange) throws Exception {
- // Skip DONE
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- // Skip ERROR
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- // Handle an ACTIVE exchange as a PROVIDER
- } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // Handle an exchange as a PROVIDER
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ // ignore DONE / ERROR status from consumers
+ return;
+ }
if (!(exchange instanceof InOnly)
&& !(exchange instanceof RobustInOnly)) {
fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
} else {
processProvider(exchange);
}
- // Handle an ACTIVE exchange as a CONSUMER
- } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- done(exchange);
+ // Handle an exchange as a CONSUMER
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ throw new IllegalStateException("Unexpected active consumer exchange received");
+ }
+ if (reportErrors) {
+ String corrId = (String) exchange.getProperty(getService().toString() + ":" + getEndpoint() + ":correlation");
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(corrId + "-exchanges");
+ if (exchanges != null) {
+ for (MessageExchange me : exchanges) {
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setError(me.getError());
+ }
+ me.setStatus(exchange.getStatus());
+ send(me);
+ }
+ }
+ }
}
}
@@ -223,6 +277,14 @@
}
// If the aggregation is not closed
if (aggregation != null) {
+ if (reportErrors) {
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+ if (exchanges == null) {
+ exchanges = new ArrayList<MessageExchange>();
+ }
+ exchanges.add(exchange);
+ store.store(correlationId + "-exchanges", exchanges);
+ }
if (addMessage(aggregation, in, exchange)) {
sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
} else {
@@ -239,8 +301,16 @@
timers.put(correlationId, t);
}
}
+ if (!reportErrors) {
+ done(exchange);
+ }
+ } else {
+ if (reportClosedAggregatesAsErrors) {
+ fail(exchange, new ClosedAggregateException());
+ } else {
+ done(exchange);
+ }
}
- done(exchange);
} finally {
lock.unlock();
}
@@ -255,6 +325,7 @@
if (processCorrelationId != null) {
me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
}
+ me.setProperty(getService().toString() + ":" + getEndpoint() + ":correlation", correlationId);
target.configureTarget(me, getContext());
NormalizedMessage nm = me.createMessage();
me.setInMessage(nm);
@@ -379,4 +450,10 @@
NormalizedMessage message,
MessageExchange exchange,
boolean timeout) throws Exception;
+
+ /**
+ * Error used to report that the aggregate has already been closed
+ */
+ public static class ClosedAggregateException extends Exception {
+ }
}
Added: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java?rev=709826&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java (added)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java Sat Nov 1 18:29:41 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.xml.namespace.QName;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.eip.patterns.StaticRecipientList;
+import org.apache.servicemix.eip.patterns.RecipientListAggregator;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+
+public class RecipientListAggregatorTest extends AbstractEIPTest {
+
+ private StaticRecipientList recipientList;
+ private RecipientListAggregator aggregator;
+
+ public void testReportErrors() throws Exception {
+ recipientList = new StaticRecipientList();
+ recipientList.setReportErrors(true);
+ recipientList.setRecipients(new ExchangeTarget[] {
+ createServiceExchangeTarget(new QName("aggregator")),
+ createServiceExchangeTarget(new QName("aggregator")),
+ });
+ configurePattern(recipientList);
+ activateComponent(recipientList, "recipientList");
+
+ aggregator = new RecipientListAggregator();
+ aggregator.setReportErrors(true);
+ aggregator.setTarget(createServiceExchangeTarget(new QName("error")));
+ configurePattern(aggregator);
+ activateComponent(aggregator, "aggregator");
+
+ activateComponent(new ReturnErrorComponent(), "error");
+
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("recipientList"));
+ NormalizedMessage message = me.getMessage("in");
+ message.setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+
+ listener.assertExchangeCompleted();
+ }
+
+}