You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/03 13:36:41 UTC
svn commit: r710044 - in
/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src:
main/java/org/apache/servicemix/eip/patterns/
main/java/org/apache/servicemix/eip/support/
test/java/org/apache/servicemix/eip/
Author: gnodet
Date: Mon Nov 3 04:36:39 2008
New Revision: 710044
URL: http://svn.apache.org/viewvc?rev=710044&view=rev
Log:
SM-1567: smx-eip AbstractAggregator should support boolean property 'reportErrors'
Added:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=710044&r1=710043&r2=710044&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Nov 3 04:36:39 2008
@@ -16,6 +16,8 @@
*/
package org.apache.servicemix.eip.patterns;
+import java.util.concurrent.locks.Lock;
+
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
@@ -126,6 +128,11 @@
if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
fail(exchange, me.getError());
return;
+ } else if (me.getFault() != null && reportErrors) {
+ MessageUtil.transferFaultToFault(me, exchange);
+ sendSync(exchange);
+ done(me);
+ return;
}
}
done(exchange);
@@ -135,26 +142,63 @@
* @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
*/
protected void processAsync(MessageExchange exchange) throws Exception {
- // If we need to report errors, the behavior is really different,
- // as we need to keep the incoming exchange in the store until
- // all acks have been received
- if (reportErrors) {
- // TODO: implement this
- throw new UnsupportedOperationException("Not implemented");
- // We are in a simple fire-and-forget behaviour.
- // This implementation is really efficient as we do not use
- // the store at all.
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
+ int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+ Lock lock = lockManager.getLock(corrId);
+ lock.lock();
+ try {
+ Integer acks = (Integer) store.load(corrId + ".acks");
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // If the acks integer is not here anymore, the message response has been sent already
+ if (acks != null) {
+ if (acks + 1 >= count) {
+ MessageExchange me = (MessageExchange) store.load(corrId);
+ done(me);
+ } else {
+ store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ }
+ }
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ // If the acks integer is not here anymore, the message response has been sent already
+ if (acks != null) {
+ if (reportErrors) {
+ MessageExchange me = (MessageExchange) store.load(corrId);
+ fail(me, exchange.getError());
+ } else if (acks + 1 >= count) {
+ MessageExchange me = (MessageExchange) store.load(corrId);
+ done(me);
+ } else {
+ store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ }
+ }
+ } else if (exchange.getFault() != null) {
+ // If the acks integer is not here anymore, the message response has been sent already
+ if (acks != null) {
+ if (reportErrors) {
+ MessageExchange me = (MessageExchange) store.load(corrId);
+ MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
+ send(me);
+ done(exchange);
+ } else if (acks + 1 >= count) {
+ MessageExchange me = (MessageExchange) store.load(corrId);
+ done(me);
+ } else {
+ store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ }
+ } else {
+ done(exchange);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
} else {
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- } else if (!(exchange instanceof InOnly)
- && !(exchange instanceof RobustInOnly)) {
+ if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
- } else if (exchange.getFault() != null) {
- done(exchange);
- } else {
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ store.store(exchange.getExchangeId(), exchange);
+ store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
NormalizedMessage in = MessageUtil.copyIn(exchange);
for (int i = 0; i < recipients.length; i++) {
MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
@@ -165,7 +209,6 @@
MessageUtil.transferToIn(in, me);
send(me);
}
- done(exchange);
}
}
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=710044&r1=710043&r2=710044&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Nov 3 04:36:39 2008
@@ -16,7 +16,9 @@
*/
package org.apache.servicemix.eip.support;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@@ -69,6 +71,10 @@
private boolean copyProperties = true;
private boolean copyAttachments = true;
+
+ private boolean reportErrors;
+
+ private boolean reportClosedAggregatesAsErrors;
private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
@@ -129,7 +135,42 @@
public void setCopyAttachments(boolean copyAttachments) {
this.copyAttachments = copyAttachments;
}
-
+
+ public boolean isReportErrors() {
+ return reportErrors;
+ }
+
+ /**
+ * Sets whether the aggregator should report errors happening when sending the
+ * aggregate on all exchanges that compose the aggregate.
+ * The default value is <code>false</code>, meaning that if any error occur, this
+ * error will be lost.
+ * Note that if this flag is set to <code>true</code>, all exchanges received as part of a given aggregate
+ * will be hold until the aggregate is sent and the DONE / ERROR status is received back.
+ *
+ * @param reportErrors <code>boolean</code> indicating if errors should be reported back to consumers
+ */
+ public void setReportErrors(boolean reportErrors) {
+ this.reportErrors = reportErrors;
+ }
+
+ public boolean isReportClosedAggregatesAsErrors() {
+ return reportClosedAggregatesAsErrors;
+ }
+
+ /**
+ * Sets whether the aggregator should report errors on incoming exchanges received after a given
+ * aggregate has been closed.
+ * The default value is <code>false</code>, meaning that such exchanges will be silently sent back
+ * with a DONE status and discarded with respect to the aggregation process.
+ *
+ * @param reportClosedAggregatesAsErrors <code>boolean</code> indicating if exchanges received for a
+ * closed aggregates should be send back with an ERROR status
+ */
+ public void setReportClosedAggregatesAsErrors(boolean reportClosedAggregatesAsErrors) {
+ this.reportClosedAggregatesAsErrors = reportClosedAggregatesAsErrors;
+ }
+
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
@@ -175,23 +216,36 @@
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
*/
public void process(MessageExchange exchange) throws Exception {
- // Skip DONE
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- // Skip ERROR
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- // Handle an ACTIVE exchange as a PROVIDER
- } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // Handle an exchange as a PROVIDER
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ // ignore DONE / ERROR status from consumers
+ return;
+ }
if (!(exchange instanceof InOnly)
&& !(exchange instanceof RobustInOnly)) {
fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
} else {
processProvider(exchange);
}
- // Handle an ACTIVE exchange as a CONSUMER
- } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- done(exchange);
+ // Handle an exchange as a CONSUMER
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ throw new IllegalStateException("Unexpected active consumer exchange received");
+ }
+ if (reportErrors) {
+ String corrId = (String) exchange.getProperty(getService().toString() + ":" + getEndpoint() + ":correlation");
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(corrId + "-exchanges");
+ if (exchanges != null) {
+ for (MessageExchange me : exchanges) {
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ me.setError(exchange.getError());
+ }
+ me.setStatus(exchange.getStatus());
+ send(me);
+ }
+ }
+ }
}
}
@@ -222,6 +276,14 @@
}
// If the aggregation is not closed
if (aggregation != null) {
+ if (reportErrors) {
+ List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
+ if (exchanges == null) {
+ exchanges = new ArrayList<MessageExchange>();
+ }
+ exchanges.add(exchange);
+ store.store(correlationId + "-exchanges", exchanges);
+ }
if (addMessage(aggregation, in, exchange)) {
sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
} else {
@@ -238,8 +300,16 @@
timers.put(correlationId, t);
}
}
+ if (!reportErrors) {
+ done(exchange);
+ }
+ } else {
+ if (reportClosedAggregatesAsErrors) {
+ fail(exchange, new ClosedAggregateException());
+ } else {
+ done(exchange);
+ }
}
- done(exchange);
} finally {
lock.unlock();
}
@@ -254,6 +324,7 @@
if (processCorrelationId != null) {
me.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
}
+ me.setProperty(getService().toString() + ":" + getEndpoint() + ":correlation", correlationId);
target.configureTarget(me, getContext());
NormalizedMessage nm = me.createMessage();
me.setInMessage(nm);
@@ -378,4 +449,10 @@
NormalizedMessage message,
MessageExchange exchange,
boolean timeout) throws Exception;
+
+ /**
+ * Error used to report that the aggregate has already been closed
+ */
+ public static class ClosedAggregateException extends Exception {
+ }
}
Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java?rev=710044&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/RecipientListAggregatorTest.java Mon Nov 3 04:36:39 2008
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.RecipientListAggregator;
+import org.apache.servicemix.eip.patterns.StaticRecipientList;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+
+public class RecipientListAggregatorTest extends AbstractEIPTest {
+
+ private StaticRecipientList recipientList;
+ private RecipientListAggregator aggregator;
+
+ public void testReportErrors() throws Exception {
+ recipientList = new StaticRecipientList();
+ recipientList.setReportErrors(true);
+ recipientList.setRecipients(new ExchangeTarget[] {
+ createServiceExchangeTarget(new QName("aggregator")),
+ createServiceExchangeTarget(new QName("aggregator")),
+ });
+ configurePattern(recipientList);
+ activateComponent(recipientList, "recipientList");
+
+ aggregator = new RecipientListAggregator();
+ aggregator.setReportErrors(true);
+ aggregator.setTarget(createServiceExchangeTarget(new QName("error")));
+ configurePattern(aggregator);
+ activateComponent(aggregator, "aggregator");
+
+ activateComponent(new ReturnErrorComponent(), "error");
+
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("recipientList"));
+ NormalizedMessage message = me.getMessage("in");
+ message.setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ assertNotNull(me.getError());
+ assertEquals("Dummy error", me.getError().getMessage());
+
+ listener.assertExchangeCompleted();
+ }
+
+}