You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/05 12:57:20 UTC
svn commit: r411741 [1/2] - in /incubator/servicemix/trunk/servicemix-eip:
./ src/main/java/org/apache/servicemix/eip/
src/main/java/org/apache/servicemix/eip/patterns/
src/main/java/org/apache/servicemix/eip/support/
src/test/java/org/apache/servicemi...
Author: gnodet
Date: Mon Jun 5 03:57:19 2006
New Revision: 411741
URL: http://svn.apache.org/viewvc?rev=411741&view=rev
Log:
Support xa transactions on seda flow (using sendSync)
Added:
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java
Modified:
incubator/servicemix/trunk/servicemix-eip/pom.xml
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java
Modified: incubator/servicemix/trunk/servicemix-eip/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/pom.xml?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-eip/pom.xml Mon Jun 5 03:57:19 2006
@@ -53,6 +53,21 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>incubator-activemq</groupId>
+ <artifactId>activemq-ra</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>tranql</groupId>
+ <artifactId>tranql-connector</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java Mon Jun 5 03:57:19 2006
@@ -25,6 +25,7 @@
import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.common.BaseLifeCycle;
import org.apache.servicemix.common.Endpoint;
import org.apache.servicemix.common.ExchangeProcessor;
@@ -188,6 +189,12 @@
}
}
+ protected void sendSync(MessageExchange me) throws MessagingException {
+ if (!channel.sendSync(me)) {
+ throw new MessagingException("SendSync failed");
+ }
+ }
+
protected void done(MessageExchange me) throws MessagingException {
me.setStatus(ExchangeStatus.DONE);
send(me);
@@ -203,5 +210,21 @@
public void stop() {
}
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws Exception {
+ boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ if (txSync && exchange.getRole() == Role.PROVIDER && exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ processSync(exchange);
+ } else {
+ processAsync(exchange);
+ }
+ }
+
+ protected abstract void processAsync(MessageExchange exchange) throws Exception;
+
+ protected abstract void processSync(MessageExchange exchange) throws Exception;
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java Mon Jun 5 03:57:19 2006
@@ -17,12 +17,11 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
@@ -43,9 +42,6 @@
*/
public class ContentBasedRouter extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(WireTap.class);
-
-
/**
* Routing rules that are evaluated to find the target destination
*/
@@ -83,69 +79,97 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processSync(MessageExchange exchange) throws Exception {
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+ // Now copy input to new exchange
+ // We need to read the message once for finding routing target
+ // so ensure we have a re-readable source
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ MessageUtil.transferToIn(in, tme);
+ // Retrieve target
+ ExchangeTarget target = getDestination(tme);
+ target.configureTarget(tme, getContext());
+ // Send in to target
+ sendSync(tme);
+ // Send back the result
+ if (tme.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (tme.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, tme.getError());
+ } else if (tme.getFault() != null) {
+ Fault fault = MessageUtil.copyFault(tme);
+ done(tme);
+ MessageUtil.transferToFault(fault, exchange);
+ sendSync(exchange);
+ } else if (tme.getMessage("out") != null) {
+ NormalizedMessage out = MessageUtil.copyOut(tme);
+ done(tme);
+ MessageUtil.transferToOut(out, exchange);
+ sendSync(exchange);
+ } else {
+ done(tme);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws MessagingException {
- try {
- if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
- exchange.getProperty(correlation) == null) {
- // Create exchange for target
- MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
- if (store.hasFeature(Store.CLUSTERED)) {
- exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
- tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
- }
- // Set correlations
- tme.setProperty(correlation, exchange.getExchangeId());
- exchange.setProperty(correlation, tme.getExchangeId());
- // Put exchange to store
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+ exchange.getProperty(correlation) == null) {
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+ if (store.hasFeature(Store.CLUSTERED)) {
+ exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+ tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ }
+ // Set correlations
+ tme.setProperty(correlation, exchange.getExchangeId());
+ exchange.setProperty(correlation, tme.getExchangeId());
+ // Put exchange to store
+ store.store(exchange.getExchangeId(), exchange);
+ // Now copy input to new exchange
+ // We need to read the message once for finding routing target
+ // so ensure we have a re-readable source
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ MessageUtil.transferToIn(in, tme);
+ // Retrieve target
+ ExchangeTarget target = getDestination(tme);
+ target.configureTarget(tme, getContext());
+ // Send in to target
+ send(tme);
+ // Mimic the exchange on the other side and send to needed listener
+ } else {
+ String id = (String) exchange.getProperty(correlation);
+ if (id == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ MessageExchange org = (MessageExchange) store.load(id);
+ if (org == null) {
+ throw new IllegalStateException("Could not load original exchange with id " + id);
+ }
+ // Reproduce DONE status to the other side
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(org);
+ // Reproduce ERROR status to the other side
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(org, exchange.getError());
+ // Reproduce faults to the other side and listeners
+ } else if (exchange.getFault() != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferTo(exchange, org, "fault");
+ send(org);
+ // Reproduce answers to the other side
+ } else if (exchange.getMessage("out") != null) {
store.store(exchange.getExchangeId(), exchange);
- // Now copy input to new exchange
- // We need to read the message once for finding routing target
- // so ensure we have a re-readable source
- NormalizedMessage in = MessageUtil.copyIn(exchange);
- MessageUtil.transferToIn(in, tme);
- // Retrieve target
- ExchangeTarget target = getDestination(tme);
- target.configureTarget(tme, getContext());
- // Send in to target
- send(tme);
- // Mimic the exchange on the other side and send to needed listener
+ MessageUtil.transferTo(exchange, org, "out");
+ send(org);
} else {
- String id = (String) exchange.getProperty(correlation);
- if (id == null) {
- throw new IllegalStateException(correlation + " property not found");
- }
- MessageExchange org = (MessageExchange) store.load(id);
- if (org == null) {
- throw new IllegalStateException("Could not load original exchange with id " + id);
- }
- // Reproduce DONE status to the other side
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- done(org);
- // Reproduce ERROR status to the other side
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- fail(org, exchange.getError());
- // Reproduce faults to the other side and listeners
- } else if (exchange.getFault() != null) {
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferTo(exchange, org, "fault");
- send(org);
- // Reproduce answers to the other side
- } else if (exchange.getMessage("out") != null) {
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferTo(exchange, org, "out");
- send(org);
- } else {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
- }
- }
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
}
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/MessageFilter.java Mon Jun 5 03:57:19 2006
@@ -17,13 +17,12 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.eip.support.MessageUtil;
@@ -42,8 +41,6 @@
*/
public class MessageFilter extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(MessageFilter.class);
-
/**
* The main target destination which will receive the exchange
*/
@@ -127,46 +124,67 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws Exception {
- try {
- // If we need to report errors, the behavior is really different,
- // as we need to keep the incoming exchange in the store until
- // all acks have been received
- if (reportErrors) {
- // TODO: implement this
- throw new UnsupportedOperationException("Not implemented");
- // We are in a simple fire-and-forget behaviour.
- // This implementation is really efficient as we do not use
- // the store at all.
- } else {
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- } else if (exchange instanceof InOnly == false &&
- exchange instanceof RobustInOnly == false) {
- fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
- } else if (exchange.getFault() != null) {
- done(exchange);
- } else {
- NormalizedMessage in = MessageUtil.copyIn(exchange);
- MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
- target.configureTarget(me, getContext());
- MessageUtil.transferToIn(in, me);
- if (filter.matches(me)) {
- send(me);
- }
+ protected void processSync(MessageExchange exchange) throws Exception {
+ if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ target.configureTarget(me, getContext());
+ MessageUtil.transferToIn(in, me);
+ if (filter.matches(me)) {
+ sendSync(me);
+ if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
+ fail(exchange, me.getError());
+ } else if (me.getStatus() == ExchangeStatus.DONE) {
done(exchange);
+ } else if (me.getFault() != null && reportErrors) {
+ Fault fault = MessageUtil.copyFault(me);
+ done(me);
+ MessageUtil.transferToFault(fault, exchange);
+ sendSync(exchange);
}
+ } else {
+ done(exchange);
}
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ // If we need to report errors, the behavior is really different,
+ // as we need to keep the incoming exchange in the store until
+ // all acks have been received
+ if (reportErrors) {
+ // TODO: implement this
+ throw new UnsupportedOperationException("Not implemented");
+ // We are in a simple fire-and-forget behaviour.
+ // This implementation is really efficient as we do not use
+ // the store at all.
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ } else if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else if (exchange.getFault() != null) {
+ done(exchange);
+ } else {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ target.configureTarget(me, getContext());
+ MessageUtil.transferToIn(in, me);
+ if (filter.matches(me)) {
+ send(me);
+ }
+ done(exchange);
}
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Pipeline.java Mon Jun 5 03:57:19 2006
@@ -19,13 +19,12 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.RobustInOnly;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.eip.support.MessageUtil;
@@ -48,8 +47,6 @@
*/
public class Pipeline extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(Pipeline.class);
-
private static final String TRANSFORMER = "Pipeline.Transformer";
private static final String CONSUMER_MEP = "Pipeline.ConsumerMEP";
@@ -127,152 +124,213 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws MessagingException {
- try {
- // The exchange comes from the consumer
- if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
- // A DONE status from the consumer can only be received
- // when a fault has been sent
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- String transformerId = (String) exchange.getProperty(correlationTransformer);
- String targetId = (String) exchange.getProperty(correlationTarget);
- if (transformerId == null && targetId == null) {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
- }
- // Load the exchange
- MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
- done(me);
- // Errors must be sent back to the target or transformer
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- String transformerId = (String) exchange.getProperty(correlationTransformer);
- String targetId = (String) exchange.getProperty(correlationTarget);
- if (transformerId == null && targetId == null) {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
- }
- // Load the exchange
- MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
- fail(me, exchange.getError());
- // This is a new exchange
- } else if (exchange.getProperty(correlationTransformer) == null) {
- if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
- fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
- return;
- }
- // Create exchange for target
- MessageExchange tme = exchangeFactory.createInOutExchange();
- transformer.configureTarget(tme, getContext());
- // Set correlations
- exchange.setProperty(correlationTransformer, tme.getExchangeId());
- tme.setProperty(correlationConsumer, exchange.getExchangeId());
- tme.setProperty(TRANSFORMER, Boolean.TRUE);
- tme.setProperty(CONSUMER_MEP, exchange.getPattern());
- // Put exchange to store
- store.store(exchange.getExchangeId(), exchange);
- // Send in to listener and target
- MessageUtil.transferInToIn(exchange, tme);
- send(tme);
- } else {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
- }
- // If the exchange comes from the transformer
- } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
- // Retrieve the correlation id
- String consumerId = (String) exchange.getProperty(correlationConsumer);
- if (consumerId == null) {
- throw new IllegalStateException(correlationConsumer + " property not found");
- }
- // This should not happen beacause the MEP is an In-Out
- // and the DONE status is always sent by the consumer (us)
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- throw new IllegalStateException("Received a DONE status from the transformer");
- // Errors must be sent back to the consumer
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- MessageExchange me = (MessageExchange) store.load(consumerId);
- fail(me, exchange.getError());
- // Faults must be sent back to the consumer
- } else if (exchange.getFault() != null) {
- MessageExchange me = (MessageExchange) store.load(consumerId);
- if (me instanceof InOnly) {
- // Do not use the fault has it may contain streams
- // So just transform it to a string and send an error
- String fault = new SourceTransformer().contentToString(exchange.getFault());
- fail(me, new FaultException(fault, null, null));
- done(exchange);
- } else {
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferFaultToFault(exchange, me);
- send(me);
- }
- // This is the answer from the transformer
- } else if (exchange.getMessage("out") != null) {
- // Retrieve the consumer MEP
- URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
- if (mep == null) {
- throw new IllegalStateException("Exchange does not carry the consumer MEP");
- }
- MessageExchange me = exchangeFactory.createExchange(mep);
- target.configureTarget(me, getContext());
- me.setProperty(correlationConsumer, consumerId);
- me.setProperty(correlationTransformer, exchange.getExchangeId());
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferOutToIn(exchange, me);
- send(me);
- // This should not happen
- } else {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
- }
- // The exchange comes from the target
+ protected void processSync(MessageExchange exchange) throws Exception {
+ if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ return;
+ }
+ // Create exchange for target
+ InOut tme = exchangeFactory.createInOutExchange();
+ transformer.configureTarget(tme, getContext());
+ // Send in to listener and target
+ MessageUtil.transferInToIn(exchange, tme);
+ sendSync(tme);
+ // Check result
+ if (tme.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Received a DONE status from the transformer");
+ }
+ // Errors must be sent back to the consumer
+ else if (tme.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, tme.getError());
+ }
+ // Faults must be sent back to the consumer
+ else if (tme.getFault() != null) {
+ if (exchange instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(tme.getFault());
+ done(tme);
+ fail(exchange, new FaultException(fault, null, null));
} else {
- // Retrieve the correlation id for the consumer
- String consumerId = (String) exchange.getProperty(correlationConsumer);
- if (consumerId == null) {
- throw new IllegalStateException(correlationConsumer + " property not found");
+ Fault fault = MessageUtil.copyFault(tme);
+ MessageUtil.transferToFault(fault, exchange);
+ done(tme);
+ sendSync(exchange);
+ }
+ }
+ // This should not happen
+ else if (tme.getOutMessage() == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+ }
+ // This is the answer from the transformer
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ target.configureTarget(me, getContext());
+ MessageUtil.transferOutToIn(tme, me);
+ sendSync(me);
+ done(tme);
+ if (me.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (me.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, me.getError());
+ } else if (me.getFault() != null) {
+ if (exchange instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(me.getFault());
+ done(me);
+ fail(exchange, new FaultException(fault, null, null));
+ } else {
+ Fault fault = MessageUtil.copyFault(me);
+ MessageUtil.transferToFault(fault, exchange);
+ done(me);
+ sendSync(exchange);
+ }
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ // The exchange comes from the consumer
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // A DONE status from the consumer can only be received
+ // when a fault has been sent
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ String transformerId = (String) exchange.getProperty(correlationTransformer);
+ String targetId = (String) exchange.getProperty(correlationTarget);
+ if (transformerId == null && targetId == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
}
- // Retrieve the correlation id for the transformer
+ // Load the exchange
+ MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+ done(me);
+ // Errors must be sent back to the target or transformer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
String transformerId = (String) exchange.getProperty(correlationTransformer);
- if (transformerId == null) {
- throw new IllegalStateException(correlationTransformer + " property not found");
+ String targetId = (String) exchange.getProperty(correlationTarget);
+ if (transformerId == null && targetId == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE + " but has no correlation set");
}
- // This should be the last message received
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- // Need to ack the transformer
- MessageExchange tme = (MessageExchange) store.load(transformerId);
- done(tme);
- // Need to ack the consumer
- MessageExchange cme = (MessageExchange) store.load(consumerId);
- done(cme);
- // Errors should be sent back to the consumer
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- // Need to ack the transformer
- MessageExchange tme = (MessageExchange) store.load(transformerId);
- done(tme);
- // Send error to consumer
- MessageExchange cme = (MessageExchange) store.load(consumerId);
- fail(cme, exchange.getError());
- // If we have a robust-in-only MEP, we can receive a fault
- } else if (exchange.getFault() != null) {
- // Need to ack the transformer
- MessageExchange tme = (MessageExchange) store.load(transformerId);
- done(tme);
- // Send fault back to consumer
- store.store(exchange.getExchangeId(), exchange);
- MessageExchange cme = (MessageExchange) store.load(consumerId);
- cme.setProperty(correlationTarget, exchange.getExchangeId());
- MessageUtil.transferFaultToFault(exchange, cme);
- send(cme);
- // This should not happen
+ // Load the exchange
+ MessageExchange me = (MessageExchange) store.load(targetId != null ? targetId : transformerId);
+ fail(me, exchange.getError());
+ // This is a new exchange
+ } else if (exchange.getProperty(correlationTransformer) == null) {
+ if (exchange instanceof InOnly == false && exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ return;
+ }
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createInOutExchange();
+ transformer.configureTarget(tme, getContext());
+ // Set correlations
+ exchange.setProperty(correlationTransformer, tme.getExchangeId());
+ tme.setProperty(correlationConsumer, exchange.getExchangeId());
+ tme.setProperty(TRANSFORMER, Boolean.TRUE);
+ tme.setProperty(CONSUMER_MEP, exchange.getPattern());
+ // Put exchange to store
+ store.store(exchange.getExchangeId(), exchange);
+ // Send in to listener and target
+ MessageUtil.transferInToIn(exchange, tme);
+ send(tme);
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no correlation set");
+ }
+ // If the exchange comes from the transformer
+ } else if (Boolean.TRUE.equals(exchange.getProperty(TRANSFORMER))) {
+ // Retrieve the correlation id
+ String consumerId = (String) exchange.getProperty(correlationConsumer);
+ if (consumerId == null) {
+ throw new IllegalStateException(correlationConsumer + " property not found");
+ }
+ // This should not happen beacause the MEP is an In-Out
+ // and the DONE status is always sent by the consumer (us)
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Received a DONE status from the transformer");
+ // Errors must be sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ MessageExchange me = (MessageExchange) store.load(consumerId);
+ fail(me, exchange.getError());
+ // Faults must be sent back to the consumer
+ } else if (exchange.getFault() != null) {
+ MessageExchange me = (MessageExchange) store.load(consumerId);
+ if (me instanceof InOnly) {
+ // Do not use the fault has it may contain streams
+ // So just transform it to a string and send an error
+ String fault = new SourceTransformer().contentToString(exchange.getFault());
+ fail(me, new FaultException(fault, null, null));
+ done(exchange);
} else {
- throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferFaultToFault(exchange, me);
+ send(me);
+ }
+ // This is the answer from the transformer
+ } else if (exchange.getMessage("out") != null) {
+ // Retrieve the consumer MEP
+ URI mep = (URI) exchange.getProperty(CONSUMER_MEP);
+ if (mep == null) {
+ throw new IllegalStateException("Exchange does not carry the consumer MEP");
}
+ MessageExchange me = exchangeFactory.createExchange(mep);
+ target.configureTarget(me, getContext());
+ me.setProperty(correlationConsumer, consumerId);
+ me.setProperty(correlationTransformer, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferOutToIn(exchange, me);
+ send(me);
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ // The exchange comes from the target
+ } else {
+ // Retrieve the correlation id for the consumer
+ String consumerId = (String) exchange.getProperty(correlationConsumer);
+ if (consumerId == null) {
+ throw new IllegalStateException(correlationConsumer + " property not found");
}
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ // Retrieve the correlation id for the transformer
+ String transformerId = (String) exchange.getProperty(correlationTransformer);
+ if (transformerId == null) {
+ throw new IllegalStateException(correlationTransformer + " property not found");
+ }
+ // This should be the last message received
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Need to ack the consumer
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ done(cme);
+ // Errors should be sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Send error to consumer
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ fail(cme, exchange.getError());
+ // If we have a robust-in-only MEP, we can receive a fault
+ } else if (exchange.getFault() != null) {
+ // Need to ack the transformer
+ MessageExchange tme = (MessageExchange) store.load(transformerId);
+ done(tme);
+ // Send fault back to consumer
+ store.store(exchange.getExchangeId(), exchange);
+ MessageExchange cme = (MessageExchange) store.load(consumerId);
+ cme.setProperty(correlationTarget, exchange.getExchangeId());
+ MessageUtil.transferFaultToFault(exchange, cme);
+ send(cme);
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange from target has a " + ExchangeStatus.ACTIVE + " status but has no Fault message");
}
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Jun 5 03:57:19 2006
@@ -19,12 +19,9 @@
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.eip.support.MessageUtil;
@@ -43,8 +40,6 @@
*/
public class StaticRecipientList extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(WireTap.class);
-
/**
* List of recipients
*/
@@ -106,46 +101,60 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws MessagingException {
- try {
- // If we need to report errors, the behavior is really different,
- // as we need to keep the incoming exchange in the store until
- // all acks have been received
- if (reportErrors) {
- // TODO: implement this
- throw new UnsupportedOperationException("Not implemented");
- // We are in a simple fire-and-forget behaviour.
- // This implementation is really efficient as we do not use
- // the store at all.
+ protected void processSync(MessageExchange exchange) throws Exception {
+ if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ return;
+ }
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ for (int i = 0; i < recipients.length; i++) {
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ recipients[i].configureTarget(me, getContext());
+ MessageUtil.transferToIn(in, me);
+ sendSync(me);
+ if (me.getStatus() == ExchangeStatus.ERROR && reportErrors) {
+ fail(exchange, me.getError());
+ return;
+ }
+ }
+ done(exchange);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ // If we need to report errors, the behavior is really different,
+ // as we need to keep the incoming exchange in the store until
+ // all acks have been received
+ if (reportErrors) {
+ // TODO: implement this
+ throw new UnsupportedOperationException("Not implemented");
+ // We are in a simple fire-and-forget behaviour.
+ // This implementation is really efficient as we do not use
+ // the store at all.
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ } else if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else if (exchange.getFault() != null) {
+ done(exchange);
} else {
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- } else if (exchange instanceof InOnly == false &&
- exchange instanceof RobustInOnly == false) {
- fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
- } else if (exchange.getFault() != null) {
- done(exchange);
- } else {
- NormalizedMessage in = MessageUtil.copyIn(exchange);
- for (int i = 0; i < recipients.length; i++) {
- MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
- recipients[i].configureTarget(me, getContext());
- MessageUtil.transferToIn(in, me);
- send(me);
- }
- done(exchange);
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ for (int i = 0; i < recipients.length; i++) {
+ MessageExchange me = exchangeFactory.createExchange(exchange.getPattern());
+ recipients[i].configureTarget(me, getContext());
+ MessageUtil.transferToIn(in, me);
+ send(me);
}
- }
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ done(exchange);
}
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java Mon Jun 5 03:57:19 2006
@@ -17,12 +17,10 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.eip.support.MessageUtil;
@@ -45,8 +43,6 @@
*/
public class StaticRoutingSlip extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(Pipeline.class);
-
/**
* List of target components used in the RoutingSlip
*/
@@ -94,114 +90,146 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws MessagingException {
- try {
- // This exchange comes from the consumer
- if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- String correlationId = (String) exchange.getProperty(correlation);
- if (correlationId == null) {
- throw new IllegalStateException(correlation + " property not found");
- }
- // Ack last target hit
- MessageExchange me = (MessageExchange) store.load(correlationId);
- done(me);
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- String correlationId = (String) exchange.getProperty(correlation);
- if (correlationId == null) {
- throw new IllegalStateException(correlation + " property not found");
- }
- // Ack last target hit
- MessageExchange me = (MessageExchange) store.load(correlationId);
- done(me);
- } else if (exchange instanceof InOut == false) {
- throw new IllegalStateException("Use an InOut MEP");
- } else {
- MessageExchange me = exchangeFactory.createInOutExchange();
- me.setProperty(correlation, exchange.getExchangeId());
- me.setProperty(index, new Integer(0));
- targets[0].configureTarget(me, getContext());
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferInToIn(exchange, me);
- send(me);
- }
- // The exchange comes from a target
+ protected void processSync(MessageExchange exchange) throws Exception {
+ if (exchange instanceof InOut == false) {
+ throw new IllegalStateException("Use an InOut MEP");
+ }
+ MessageExchange current = exchange;
+ for (int i = 0; i < targets.length; i++) {
+ InOut me = exchangeFactory.createInOutExchange();
+ targets[i].configureTarget(me, getContext());
+ if (i == 0) {
+ MessageUtil.transferInToIn(current, me);
} else {
+ MessageUtil.transferOutToIn(current, me);
+ }
+ sendSync(me);
+ if (i != 0) {
+ done(current);
+ }
+ if (me.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+ } else if (me.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, me.getError());
+ return;
+ } else if (me.getFault() != null) {
+ Fault fault = MessageUtil.copyFault(me);
+ MessageUtil.transferToFault(fault, exchange);
+ done(me);
+ sendSync(exchange);
+ return;
+ } else if (me.getOutMessage() == null) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ current = me;
+ }
+ MessageUtil.transferToOut(MessageUtil.copyOut(current), exchange);
+ done(current);
+ sendSync(exchange);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ // This exchange comes from the consumer
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
String correlationId = (String) exchange.getProperty(correlation);
- String previousId = (String) exchange.getProperty(previous);
- Integer prevIndex = (Integer) exchange.getProperty(index);
if (correlationId == null) {
throw new IllegalStateException(correlation + " property not found");
}
- if (prevIndex == null) {
- throw new IllegalStateException(previous + " property not found");
+ // Ack last target hit
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ done(me);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ String correlationId = (String) exchange.getProperty(correlation);
+ if (correlationId == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ // Ack last target hit
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ done(me);
+ } else if (exchange instanceof InOut == false) {
+ throw new IllegalStateException("Use an InOut MEP");
+ } else {
+ MessageExchange me = exchangeFactory.createInOutExchange();
+ me.setProperty(correlation, exchange.getExchangeId());
+ me.setProperty(index, new Integer(0));
+ targets[0].configureTarget(me, getContext());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferInToIn(exchange, me);
+ send(me);
+ }
+ // The exchange comes from a target
+ } else {
+ String correlationId = (String) exchange.getProperty(correlation);
+ String previousId = (String) exchange.getProperty(previous);
+ Integer prevIndex = (Integer) exchange.getProperty(index);
+ if (correlationId == null) {
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ if (prevIndex == null) {
+ throw new IllegalStateException(previous + " property not found");
+ }
+ // This should never happen, as we can only send DONE
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
+ // ERROR are sent back to the consumer
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ fail(me, exchange.getError());
+ // Ack the previous target
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
+ }
+ // Faults are sent back to the consumer
+ } else if (exchange.getFault() != null) {
+ MessageExchange me = (MessageExchange) store.load(correlationId);
+ me.setProperty(correlation, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferFaultToFault(exchange, me);
+ send(me);
+ // Ack the previous target
+ if (previousId != null) {
+ me = (MessageExchange) store.load(previousId);
+ done(me);
}
- // This should never happen, as we can only send DONE
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
- // ERROR are sent back to the consumer
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ // Out message, give it to next target or back to consumer
+ } else if (exchange.getMessage("out") != null) {
+ // This is the answer from the last target
+ if (prevIndex.intValue() == targets.length - 1) {
MessageExchange me = (MessageExchange) store.load(correlationId);
- fail(me, exchange.getError());
- // Ack the previous target
+ me.setProperty(correlation, exchange.getExchangeId());
+ store.store(exchange.getExchangeId(), exchange);
+ MessageUtil.transferOutToOut(exchange, me);
+ send(me);
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
- // Faults are sent back to the consumer
- } else if (exchange.getFault() != null) {
- MessageExchange me = (MessageExchange) store.load(correlationId);
- me.setProperty(correlation, exchange.getExchangeId());
+ // We still have a target to hit
+ } else {
+ MessageExchange me = exchangeFactory.createInOutExchange();
+ Integer curIndex = new Integer(prevIndex.intValue() + 1);
+ me.setProperty(correlation, correlationId);
+ me.setProperty(index, curIndex);
+ me.setProperty(previous, exchange.getExchangeId());
+ targets[curIndex.intValue()].configureTarget(me, getContext());
store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferFaultToFault(exchange, me);
+ MessageUtil.transferOutToIn(exchange, me);
send(me);
- // Ack the previous target
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
- // Out message, give it to next target or back to consumer
- } else if (exchange.getMessage("out") != null) {
- // This is the answer from the last target
- if (prevIndex.intValue() == targets.length - 1) {
- MessageExchange me = (MessageExchange) store.load(correlationId);
- me.setProperty(correlation, exchange.getExchangeId());
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferOutToOut(exchange, me);
- send(me);
- if (previousId != null) {
- me = (MessageExchange) store.load(previousId);
- done(me);
- }
- // We still have a target to hit
- } else {
- MessageExchange me = exchangeFactory.createInOutExchange();
- Integer curIndex = new Integer(prevIndex.intValue() + 1);
- me.setProperty(correlation, correlationId);
- me.setProperty(index, curIndex);
- me.setProperty(previous, exchange.getExchangeId());
- targets[curIndex.intValue()].configureTarget(me, getContext());
- store.store(exchange.getExchangeId(), exchange);
- MessageUtil.transferOutToIn(exchange, me);
- send(me);
- if (previousId != null) {
- me = (MessageExchange) store.load(previousId);
- done(me);
- }
- }
- // This should not happen
- } else {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
}
- }
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ // This should not happen
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
}
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/WireTap.java Mon Jun 5 03:57:19 2006
@@ -19,11 +19,8 @@
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.eip.support.ExchangeTarget;
@@ -47,8 +44,6 @@
*/
public class WireTap extends EIPEndpoint {
- private static final Log log = LogFactory.getLog(WireTap.class);
-
/**
* The main target destination which will receive the exchange
*/
@@ -140,70 +135,85 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processSync(MessageExchange exchange) throws Exception {
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+ target.configureTarget(tme, getContext());
+ sendSyncToListenerAndTarget(exchange, tme, inListener, "in");
+ if (tme.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (tme.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, tme.getError());
+ } else if (tme.getFault() != null) {
+ sendSyncToListenerAndTarget(tme, exchange, faultListener, "fault");
+ done(tme);
+ } else if (tme.getMessage("out") != null) {
+ sendSyncToListenerAndTarget(tme, exchange, outListener, "out");
+ done(tme);
+ } else {
+ done(tme);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
*/
- public void process(MessageExchange exchange) throws MessagingException {
- try {
- if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
- exchange.getProperty(correlation) == null) {
- // Create exchange for target
- MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
- if (store.hasFeature(Store.CLUSTERED)) {
- exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
- tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+ exchange.getProperty(correlation) == null) {
+ // Create exchange for target
+ MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+ if (store.hasFeature(Store.CLUSTERED)) {
+ exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+ tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ }
+ target.configureTarget(tme, getContext());
+ // Set correlations
+ exchange.setProperty(correlation, tme.getExchangeId());
+ tme.setProperty(correlation, exchange.getExchangeId());
+ // Put exchange to store
+ store.store(exchange.getExchangeId(), exchange);
+ // Send in to listener and target
+ sendToListenerAndTarget(exchange, tme, inListener, "in");
+ // Mimic the exchange on the other side and send to needed listener
+ } else {
+ String id = (String) exchange.getProperty(correlation);
+ if (id == null) {
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
+ exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ // This must be a listener status, so ignore
+ return;
}
- target.configureTarget(tme, getContext());
- // Set correlations
- exchange.setProperty(correlation, tme.getExchangeId());
- tme.setProperty(correlation, exchange.getExchangeId());
- // Put exchange to store
+ throw new IllegalStateException(correlation + " property not found");
+ }
+ MessageExchange org = (MessageExchange) store.load(id);
+ if (org == null) {
+ throw new IllegalStateException("Could not load original exchange with id " + id);
+ }
+ // Reproduce DONE status to the other side
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(org);
+ // Reproduce ERROR status to the other side
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(org, exchange.getError());
+ // Reproduce faults to the other side and listeners
+ } else if (exchange.getFault() != null) {
store.store(exchange.getExchangeId(), exchange);
- // Send in to listener and target
- sendToTargetAndListener(exchange, tme, inListener, "in");
- // Mimic the exchange on the other side and send to needed listener
+ sendToListenerAndTarget(exchange, org, faultListener, "fault");
+ // Reproduce answers to the other side
+ } else if (exchange.getMessage("out") != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ sendToListenerAndTarget(exchange, org, outListener, "out");
} else {
- String id = (String) exchange.getProperty(correlation);
- if (id == null) {
- if (exchange.getRole() == MessageExchange.Role.CONSUMER &&
- exchange.getStatus() != ExchangeStatus.ACTIVE) {
- // This must be a listener status, so ignore
- return;
- }
- throw new IllegalStateException(correlation + " property not found");
- }
- MessageExchange org = (MessageExchange) store.load(id);
- if (org == null) {
- throw new IllegalStateException("Could not load original exchange with id " + id);
- }
- // Reproduce DONE status to the other side
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- done(org);
- // Reproduce ERROR status to the other side
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- fail(org, exchange.getError());
- // Reproduce faults to the other side and listeners
- } else if (exchange.getFault() != null) {
- store.store(exchange.getExchangeId(), exchange);
- sendToTargetAndListener(exchange, org, faultListener, "fault");
- // Reproduce answers to the other side
- } else if (exchange.getMessage("out") != null) {
- store.store(exchange.getExchangeId(), exchange);
- sendToTargetAndListener(exchange, org, outListener, "out");
- } else {
- throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
- }
- }
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
}
}
}
- private void sendToTargetAndListener(MessageExchange source,
+ private void sendToListenerAndTarget(MessageExchange source,
MessageExchange dest,
ExchangeTarget listener,
String message) throws Exception {
@@ -219,8 +229,29 @@
MessageUtil.transferTo(msg, dest, message);
send(dest);
} else {
- MessageUtil.transferTo(source, dest, message);
- send(dest);
+ MessageUtil.transferTo(source, dest, message);
+ send(dest);
+ }
+ }
+
+ private void sendSyncToListenerAndTarget(MessageExchange source,
+ MessageExchange dest,
+ ExchangeTarget listener,
+ String message) throws Exception {
+ if (listener != null) {
+ NormalizedMessage msg = MessageUtil.copy(source.getMessage(message));
+ InOnly lme = exchangeFactory.createInOnlyExchange();
+ if (store.hasFeature(Store.CLUSTERED)) {
+ lme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+ }
+ listener.configureTarget(lme, getContext());
+ MessageUtil.transferToIn(msg, lme);
+ sendSync(lme);
+ MessageUtil.transferTo(msg, dest, message);
+ sendSync(dest);
+ } else {
+ MessageUtil.transferTo(source, dest, message);
+ sendSync(dest);
}
}
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Jun 5 03:57:19 2006
@@ -53,10 +53,26 @@
private ExchangeTarget target;
private boolean rescheduleTimeouts;
+
+ private boolean synchronous;
private ConcurrentMap closedAggregates = new ConcurrentHashMap();
/**
+ * @return the synchronous
+ */
+ public boolean isSynchronous() {
+ return synchronous;
+ }
+
+ /**
+ * @param synchronous the synchronous to set
+ */
+ public void setSynchronous(boolean synchronous) {
+ this.synchronous = synchronous;
+ }
+
+ /**
* @return the rescheduleTimeouts
*/
public boolean isRescheduleTimeouts() {
@@ -84,80 +100,84 @@
this.target = target;
}
- /*(non-Javadoc)
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processSync(MessageExchange exchange) throws Exception {
+ throw new IllegalStateException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ throw new IllegalStateException();
+ }
+
+ /* (non-Javadoc)
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
*/
public void process(MessageExchange exchange) throws Exception {
- try {
- // Skip DONE
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- // Skip ERROR
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- // Handle an ACTIVE exchange as a PROVIDER
- } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
- if (exchange instanceof InOnly == false &&
- exchange instanceof RobustInOnly == false) {
- fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
- } else {
- NormalizedMessage in = MessageUtil.copyIn(exchange);
- final String correlationId = getCorrelationID(exchange, in);
- if (correlationId == null || correlationId.length() == 0) {
- throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
- }
- // Load existing aggregation
- Lock lock = getLockManager().getLock(correlationId);
- lock.lock();
- try {
- Object aggregation = store.load(correlationId);
- Date timeout = null;
- // Create a new aggregate
- if (aggregation == null) {
- if (isAggregationClosed(correlationId)) {
- // TODO: should we return an error here ?
- } else {
- aggregation = createAggregation(correlationId);
- timeout = getTimeout(aggregation);
- }
- } else if (isRescheduleTimeouts()) {
+ // Skip DONE
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ // Skip ERROR
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ // Handle an ACTIVE exchange as a PROVIDER
+ } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ if (exchange instanceof InOnly == false &&
+ exchange instanceof RobustInOnly == false) {
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ } else {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ final String correlationId = getCorrelationID(exchange, in);
+ if (correlationId == null || correlationId.length() == 0) {
+ throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
+ }
+ // Load existing aggregation
+ Lock lock = getLockManager().getLock(correlationId);
+ lock.lock();
+ try {
+ Object aggregation = store.load(correlationId);
+ Date timeout = null;
+ // Create a new aggregate
+ if (aggregation == null) {
+ if (isAggregationClosed(correlationId)) {
+ // TODO: should we return an error here ?
+ } else {
+ aggregation = createAggregation(correlationId);
timeout = getTimeout(aggregation);
-
}
- // If the aggregation is not closed
- if (aggregation != null) {
- if (addMessage(aggregation, in, exchange)) {
- sendAggregate(correlationId, aggregation, false);
- } else {
- store.store(correlationId, aggregation);
- if (timeout != null) {
- if (log.isDebugEnabled()) {
- log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
- }
- getTimerManager().schedule(new TimerListener() {
- public void timerExpired(Timer timer) {
- AbstractAggregator.this.onTimeout(correlationId);
- }
- }, timeout);
+ } else if (isRescheduleTimeouts()) {
+ timeout = getTimeout(aggregation);
+ }
+ // If the aggregation is not closed
+ if (aggregation != null) {
+ if (addMessage(aggregation, in, exchange)) {
+ sendAggregate(correlationId, aggregation, false);
+ } else {
+ store.store(correlationId, aggregation);
+ if (timeout != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling timeout at " + timeout + " for aggregate " + correlationId);
}
+ getTimerManager().schedule(new TimerListener() {
+ public void timerExpired(Timer timer) {
+ AbstractAggregator.this.onTimeout(correlationId);
+ }
+ }, timeout);
}
}
- done(exchange);
- } finally {
- lock.unlock();
}
+ done(exchange);
+ } finally {
+ lock.unlock();
}
- // Handle an ACTIVE exchange as a CONSUMER
- } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- done(exchange);
- }
- // If an error occurs, log it and report the error back to the sender
- // if the exchange is still ACTIVE
- } catch (Exception e) {
- log.error("An exception occured while processing exchange", e);
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, e);
}
+ // Handle an ACTIVE exchange as a CONSUMER
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ done(exchange);
}
}
@@ -170,7 +190,11 @@
me.setInMessage(nm);
buildAggregate(aggregation, nm, me, timeout);
closeAggregation(correlationId);
- send(me);
+ if (isSynchronous()) {
+ sendSync(me);
+ } else {
+ send(me);
+ }
}
protected void onTimeout(String correlationId) {